From 859f27040d0ce5358746c124edd10411ce94b895 Mon Sep 17 00:00:00 2001
From: Jian He
Date: Wed, 26 Mar 2014 03:48:46 +0000
Subject: [PATCH] Merge r1581678 from trunk. YARN-1521. Mark
Idempotent/AtMostOnce annotations to the APIs in ApplicationClientProtcol,
ResourceManagerAdministrationProtocol and ResourceTrackerProtocol so that
they work in HA scenario. Contributed by Xuan Gong
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1581679 13f79535-47bb-0310-9956-ffa450edef68
---
hadoop-yarn-project/CHANGES.txt | 5 +
.../yarn/api/ApplicationClientProtocol.java | 20 +
...ResourceManagerAdministrationProtocol.java | 8 +
.../yarn/client/ProtocolHATestBase.java | 721 ++++++++++++++++++
.../TestApplicationClientProtocolOnHA.java | 211 +++++
.../yarn/client/TestResourceTrackerOnHA.java | 91 +++
.../yarn/server/api/ResourceTracker.java | 4 +
.../resourcemanager/ClientRMService.java | 13 +-
.../resourcemanager/rmapp/RMAppImpl.java | 2 +-
.../resourcemanager/TestClientRMService.java | 4 +-
.../TestSubmitApplicationWithRMHA.java | 85 +++
.../hadoop/yarn/server/MiniYARNCluster.java | 19 +-
12 files changed, 1162 insertions(+), 21 deletions(-)
create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java
create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationClientProtocolOnHA.java
create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 53dea295163..f28bf085232 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -565,6 +565,11 @@ Release 2.4.0 - UNRELEASED
YARN-1867. Fixed a bug in ResourceManager that was causing invalid ACL checks
in the web-services after fail-over. (Vinod Kumar Vavilapalli)
+ YARN-1521. Mark Idempotent/AtMostOnce annotations to the APIs in
+ ApplicationClientProtcol, ResourceManagerAdministrationProtocol and
+ ResourceTrackerProtocol so that they work in HA scenario. (Xuan Gong
+ via jianhe)
+
Release 2.3.1 - UNRELEASED
INCOMPATIBLE CHANGES
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java
index 8db91517bea..e449c1ee3f2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java
@@ -104,6 +104,7 @@ public interface ApplicationClientProtocol {
*/
@Public
@Stable
+ @Idempotent
public GetNewApplicationResponse getNewApplication(
GetNewApplicationRequest request)
throws YarnException, IOException;
@@ -133,6 +134,10 @@ public interface ApplicationClientProtocol {
* it encounters the {@link ApplicationNotFoundException} on the
* {@link #getApplicationReport(GetApplicationReportRequest)} call.
*
+ * During the submission process, it checks whether the application
+ * already exists. If the application exists, it will simply return
+ * SubmitApplicationResponse
+ *
* In secure mode,the ResourceManager
verifies access to
* queues etc. before accepting the application submission.
*
@@ -147,6 +152,7 @@ public interface ApplicationClientProtocol {
*/
@Public
@Stable
+ @Idempotent
public SubmitApplicationResponse submitApplication(
SubmitApplicationRequest request)
throws YarnException, IOException;
@@ -173,6 +179,7 @@ public interface ApplicationClientProtocol {
*/
@Public
@Stable
+ @Idempotent
public KillApplicationResponse forceKillApplication(
KillApplicationRequest request)
throws YarnException, IOException;
@@ -231,6 +238,7 @@ public interface ApplicationClientProtocol {
*/
@Public
@Stable
+ @Idempotent
public GetClusterMetricsResponse getClusterMetrics(
GetClusterMetricsRequest request)
throws YarnException, IOException;
@@ -258,6 +266,7 @@ public interface ApplicationClientProtocol {
*/
@Public
@Stable
+ @Idempotent
public GetApplicationsResponse getApplications(
GetApplicationsRequest request)
throws YarnException, IOException;
@@ -277,6 +286,7 @@ public interface ApplicationClientProtocol {
*/
@Public
@Stable
+ @Idempotent
public GetClusterNodesResponse getClusterNodes(
GetClusterNodesRequest request)
throws YarnException, IOException;
@@ -298,6 +308,7 @@ public interface ApplicationClientProtocol {
*/
@Public
@Stable
+ @Idempotent
public GetQueueInfoResponse getQueueInfo(
GetQueueInfoRequest request)
throws YarnException, IOException;
@@ -317,6 +328,7 @@ public interface ApplicationClientProtocol {
*/
@Public
@Stable
+ @Idempotent
public GetQueueUserAclsInfoResponse getQueueUserAcls(
GetQueueUserAclsInfoRequest request)
throws YarnException, IOException;
@@ -335,6 +347,7 @@ public interface ApplicationClientProtocol {
*/
@Public
@Stable
+ @Idempotent
public GetDelegationTokenResponse getDelegationToken(
GetDelegationTokenRequest request)
throws YarnException, IOException;
@@ -349,6 +362,7 @@ public interface ApplicationClientProtocol {
*/
@Private
@Unstable
+ @Idempotent
public RenewDelegationTokenResponse renewDelegationToken(
RenewDelegationTokenRequest request) throws YarnException,
IOException;
@@ -363,6 +377,7 @@ public interface ApplicationClientProtocol {
*/
@Private
@Unstable
+ @Idempotent
public CancelDelegationTokenResponse cancelDelegationToken(
CancelDelegationTokenRequest request) throws YarnException,
IOException;
@@ -377,6 +392,7 @@ public interface ApplicationClientProtocol {
*/
@Public
@Unstable
+ @Idempotent
public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
MoveApplicationAcrossQueuesRequest request) throws YarnException, IOException;
@@ -422,6 +438,7 @@ public interface ApplicationClientProtocol {
*/
@Public
@Unstable
+ @Idempotent
public GetApplicationAttemptReportResponse getApplicationAttemptReport(
GetApplicationAttemptReportRequest request) throws YarnException,
IOException;
@@ -453,6 +470,7 @@ public interface ApplicationClientProtocol {
*/
@Public
@Unstable
+ @Idempotent
public GetApplicationAttemptsResponse getApplicationAttempts(
GetApplicationAttemptsRequest request) throws YarnException, IOException;
@@ -486,6 +504,7 @@ public interface ApplicationClientProtocol {
*/
@Public
@Unstable
+ @Idempotent
public GetContainerReportResponse getContainerReport(
GetContainerReportRequest request) throws YarnException, IOException;
@@ -520,6 +539,7 @@ public interface ApplicationClientProtocol {
*/
@Public
@Unstable
+ @Idempotent
public GetContainersResponse getContainers(GetContainersRequest request)
throws YarnException, IOException;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java
index 26415992ac0..4b777eaac5f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.io.retry.Idempotent;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.tools.GetUserMappingsProtocol;
import org.apache.hadoop.yarn.api.records.NodeId;
@@ -50,16 +51,19 @@ public interface ResourceManagerAdministrationProtocol extends GetUserMappingsPr
@Public
@Stable
+ @Idempotent
public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
throws StandbyException, YarnException, IOException;
@Public
@Stable
+ @Idempotent
public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
throws StandbyException, YarnException, IOException;
@Public
@Stable
+ @Idempotent
public RefreshSuperUserGroupsConfigurationResponse
refreshSuperUserGroupsConfiguration(
RefreshSuperUserGroupsConfigurationRequest request)
@@ -67,18 +71,21 @@ public interface ResourceManagerAdministrationProtocol extends GetUserMappingsPr
@Public
@Stable
+ @Idempotent
public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings(
RefreshUserToGroupsMappingsRequest request)
throws StandbyException, YarnException, IOException;
@Public
@Stable
+ @Idempotent
public RefreshAdminAclsResponse refreshAdminAcls(
RefreshAdminAclsRequest request)
throws YarnException, IOException;
@Public
@Stable
+ @Idempotent
public RefreshServiceAclsResponse refreshServiceAcls(
RefreshServiceAclsRequest request)
throws YarnException, IOException;
@@ -99,6 +106,7 @@ public interface ResourceManagerAdministrationProtocol extends GetUserMappingsPr
*/
@Public
@Evolving
+ @Idempotent
public UpdateNodeResourceResponse updateNodeResource(
UpdateNodeResourceRequest request)
throws YarnException, IOException;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java
new file mode 100644
index 00000000000..4f3cab2c327
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java
@@ -0,0 +1,721 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.ClientBaseWithFixes;
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerReport;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.QueueACL;
+import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.QueueState;
+import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.HAUtil;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
+import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
+import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
+import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager;
+import org.apache.hadoop.yarn.server.resourcemanager.RMAppManager;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.After;
+import org.junit.Before;
+
+
+public abstract class ProtocolHATestBase extends ClientBaseWithFixes{
+ protected static final HAServiceProtocol.StateChangeRequestInfo req =
+ new HAServiceProtocol.StateChangeRequestInfo(
+ HAServiceProtocol.RequestSource.REQUEST_BY_USER);
+
+ protected static final String RM1_NODE_ID = "rm1";
+ protected static final int RM1_PORT_BASE = 10000;
+ protected static final String RM2_NODE_ID = "rm2";
+ protected static final int RM2_PORT_BASE = 20000;
+
+ protected Configuration conf;
+ protected MiniYARNClusterForHATesting cluster;
+
+ protected Thread failoverThread = null;
+ private volatile boolean keepRunning;
+
+ private void setConfForRM(String rmId, String prefix, String value) {
+ conf.set(HAUtil.addSuffix(prefix, rmId), value);
+ }
+
+ private void setRpcAddressForRM(String rmId, int base) {
+ setConfForRM(rmId, YarnConfiguration.RM_ADDRESS, "0.0.0.0:" +
+ (base + YarnConfiguration.DEFAULT_RM_PORT));
+ setConfForRM(rmId, YarnConfiguration.RM_SCHEDULER_ADDRESS, "0.0.0.0:" +
+ (base + YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT));
+ setConfForRM(rmId, YarnConfiguration.RM_ADMIN_ADDRESS, "0.0.0.0:" +
+ (base + YarnConfiguration.DEFAULT_RM_ADMIN_PORT));
+ setConfForRM(rmId, YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
+ "0.0.0.0:" + (base + YarnConfiguration
+ .DEFAULT_RM_RESOURCE_TRACKER_PORT));
+ setConfForRM(rmId, YarnConfiguration.RM_WEBAPP_ADDRESS, "0.0.0.0:" +
+ (base + YarnConfiguration.DEFAULT_RM_WEBAPP_PORT));
+ setConfForRM(rmId, YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS, "0.0.0.0:" +
+ (base + YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_PORT));
+ }
+
+ @Before
+ public void setup() throws IOException {
+ failoverThread = null;
+ keepRunning = true;
+ conf = new YarnConfiguration();
+ conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
+ conf.setInt(YarnConfiguration.CLIENT_FAILOVER_MAX_ATTEMPTS, 5);
+ conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID);
+ setRpcAddressForRM(RM1_NODE_ID, RM1_PORT_BASE);
+ setRpcAddressForRM(RM2_NODE_ID, RM2_PORT_BASE);
+
+ conf.setLong(YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS, 100L);
+
+ conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
+ conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_USE_RPC, true);
+ }
+
+ @After
+ public void teardown() throws Exception {
+ keepRunning = false;
+ if (failoverThread != null) {
+ failoverThread.interrupt();
+ try {
+ failoverThread.join();
+ } catch (InterruptedException ex) {
+ LOG.error("Error joining with failover thread", ex);
+ }
+ }
+ cluster.stop();
+ }
+
+ protected AdminService getAdminService(int index) {
+ return cluster.getResourceManager(index).getRMContext()
+ .getRMAdminService();
+ }
+
+ protected void explicitFailover() throws IOException {
+ int activeRMIndex = cluster.getActiveRMIndex();
+ int newActiveRMIndex = (activeRMIndex + 1) % 2;
+ getAdminService(activeRMIndex).transitionToStandby(req);
+ getAdminService(newActiveRMIndex).transitionToActive(req);
+ assertEquals("Failover failed", newActiveRMIndex,
+ cluster.getActiveRMIndex());
+ }
+
+ protected YarnClient createAndStartYarnClient(Configuration conf) {
+ Configuration configuration = new YarnConfiguration(conf);
+ YarnClient client = YarnClient.createYarnClient();
+ client.init(configuration);
+ client.start();
+ return client;
+ }
+
+ protected void verifyConnections() throws InterruptedException,
+ YarnException {
+ assertTrue("NMs failed to connect to the RM",
+ cluster.waitForNodeManagersToConnect(20000));
+ verifyClientConnection();
+ }
+
+ protected void verifyClientConnection() {
+ int numRetries = 3;
+ while(numRetries-- > 0) {
+ Configuration conf = new YarnConfiguration(this.conf);
+ YarnClient client = createAndStartYarnClient(conf);
+ try {
+ Thread.sleep(100);
+ client.getApplications();
+ return;
+ } catch (Exception e) {
+ LOG.error(e.getMessage());
+ } finally {
+ client.stop();
+ }
+ }
+ fail("Client couldn't connect to the Active RM");
+ }
+
+ protected Thread createAndStartFailoverThread() {
+ Thread failoverThread = new Thread() {
+ public void run() {
+ keepRunning = true;
+ while (keepRunning) {
+ if (cluster.getStartFailoverFlag()) {
+ try {
+ explicitFailover();
+ keepRunning = false;
+ cluster.resetFailoverTriggeredFlag(true);
+ } catch (Exception e) {
+ // Do Nothing
+ } finally {
+ keepRunning = false;
+ }
+ }
+ try {
+ Thread.sleep(50);
+ } catch (InterruptedException e) {
+ // DO NOTHING
+ }
+ }
+ }
+ };
+ failoverThread.start();
+ return failoverThread;
+ }
+
+ protected void startHACluster(int numOfNMs, boolean overrideClientRMService,
+ boolean overrideRTS) throws Exception {
+ conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
+ cluster =
+ new MiniYARNClusterForHATesting(TestRMFailover.class.getName(), 2,
+ numOfNMs, 1, 1, false, overrideClientRMService, overrideRTS);
+ cluster.resetStartFailoverFlag(false);
+ cluster.init(conf);
+ cluster.start();
+ getAdminService(0).transitionToActive(req);
+ assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
+ verifyConnections();
+
+ // Do the failover
+ explicitFailover();
+ verifyConnections();
+
+ failoverThread = createAndStartFailoverThread();
+
+ }
+
+ protected ResourceManager getActiveRM() {
+ return cluster.getResourceManager(cluster.getActiveRMIndex());
+ }
+
+ public class MiniYARNClusterForHATesting extends MiniYARNCluster {
+
+ private boolean overrideClientRMService;
+ private boolean overrideRTS;
+ private final AtomicBoolean startFailover = new AtomicBoolean(false);
+ private final AtomicBoolean failoverTriggered = new AtomicBoolean(false);
+
+ public MiniYARNClusterForHATesting(String testName,
+ int numResourceManagers, int numNodeManagers, int numLocalDirs,
+ int numLogDirs, boolean enableAHS, boolean overrideClientRMService,
+ boolean overrideRTS) {
+ super(testName, numResourceManagers, numNodeManagers, numLocalDirs,
+ numLogDirs, enableAHS);
+ this.overrideClientRMService = overrideClientRMService;
+ this.overrideRTS = overrideRTS;
+ }
+
+ public boolean getStartFailoverFlag() {
+ return startFailover.get();
+ }
+
+ public void resetStartFailoverFlag(boolean flag) {
+ startFailover.set(flag);
+ }
+
+ public void resetFailoverTriggeredFlag(boolean flag) {
+ failoverTriggered.set(flag);
+ }
+
+ private boolean waittingForFailOver() {
+ int maximumWaittingTime = 50;
+ int count = 0;
+ while (!failoverTriggered.get() && count >= maximumWaittingTime) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ // DO NOTHING
+ }
+ count++;
+ }
+ if (count >= maximumWaittingTime) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ protected ResourceManager createResourceManager() {
+ return new ResourceManager() {
+ @Override
+ protected void doSecureLogin() throws IOException {
+ // Don't try to login using keytab in the testcases.
+ }
+ @Override
+ protected ClientRMService createClientRMService() {
+ if (overrideClientRMService) {
+ return new CustomedClientRMService(this.rmContext, this.scheduler,
+ this.rmAppManager, this.applicationACLsManager,
+ this.queueACLsManager,
+ this.rmContext.getRMDelegationTokenSecretManager());
+ }
+ return super.createClientRMService();
+ }
+ @Override
+ protected ResourceTrackerService createResourceTrackerService() {
+ if (overrideRTS) {
+ return new CustomedResourceTrackerService(this.rmContext,
+ this.nodesListManager, this.nmLivelinessMonitor,
+ this.rmContext.getContainerTokenSecretManager(),
+ this.rmContext.getNMTokenSecretManager());
+ }
+ return super.createResourceTrackerService();
+ }
+ };
+ }
+
+ private class CustomedClientRMService extends ClientRMService {
+ public CustomedClientRMService(RMContext rmContext,
+ YarnScheduler scheduler, RMAppManager rmAppManager,
+ ApplicationACLsManager applicationACLsManager,
+ QueueACLsManager queueACLsManager,
+ RMDelegationTokenSecretManager rmDTSecretManager) {
+ super(rmContext, scheduler, rmAppManager, applicationACLsManager,
+ queueACLsManager, rmDTSecretManager);
+ }
+
+ @Override
+ public GetNewApplicationResponse getNewApplication(
+ GetNewApplicationRequest request) throws YarnException {
+ resetStartFailoverFlag(true);
+
+ // make sure failover has been triggered
+ Assert.assertTrue(waittingForFailOver());
+
+ // create the GetNewApplicationResponse with fake applicationId
+ GetNewApplicationResponse response =
+ GetNewApplicationResponse.newInstance(
+ createFakeAppId(), null, null);
+ return response;
+ }
+
+ @Override
+ public GetApplicationReportResponse getApplicationReport(
+ GetApplicationReportRequest request) throws YarnException {
+ resetStartFailoverFlag(true);
+
+ // make sure failover has been triggered
+ Assert.assertTrue(waittingForFailOver());
+
+ // create a fake application report
+ ApplicationReport report = createFakeAppReport();
+ GetApplicationReportResponse response =
+ GetApplicationReportResponse.newInstance(report);
+ return response;
+ }
+
+ @Override
+ public GetClusterMetricsResponse getClusterMetrics(
+ GetClusterMetricsRequest request) throws YarnException {
+ resetStartFailoverFlag(true);
+
+ // make sure failover has been triggered
+ Assert.assertTrue(waittingForFailOver());
+
+ // create GetClusterMetricsResponse with fake YarnClusterMetrics
+ GetClusterMetricsResponse response =
+ GetClusterMetricsResponse.newInstance(
+ createFakeYarnClusterMetrics());
+ return response;
+ }
+
+ @Override
+ public GetApplicationsResponse getApplications(
+ GetApplicationsRequest request) throws YarnException {
+ resetStartFailoverFlag(true);
+
+ // make sure failover has been triggered
+ Assert.assertTrue(waittingForFailOver());
+
+ // create GetApplicationsResponse with fake applicationList
+ GetApplicationsResponse response =
+ GetApplicationsResponse.newInstance(createFakeAppReports());
+ return response;
+ }
+
+ @Override
+ public GetClusterNodesResponse getClusterNodes(
+ GetClusterNodesRequest request)
+ throws YarnException {
+ resetStartFailoverFlag(true);
+
+ // make sure failover has been triggered
+ Assert.assertTrue(waittingForFailOver());
+
+ // create GetClusterNodesResponse with fake ClusterNodeLists
+ GetClusterNodesResponse response =
+ GetClusterNodesResponse.newInstance(createFakeNodeReports());
+ return response;
+ }
+
+ @Override
+ public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request)
+ throws YarnException {
+ resetStartFailoverFlag(true);
+
+ // make sure failover has been triggered
+ Assert.assertTrue(waittingForFailOver());
+
+ // return fake QueueInfo
+ return GetQueueInfoResponse.newInstance(createFakeQueueInfo());
+ }
+
+ @Override
+ public GetQueueUserAclsInfoResponse getQueueUserAcls(
+ GetQueueUserAclsInfoRequest request) throws YarnException {
+ resetStartFailoverFlag(true);
+
+ // make sure failover has been triggered
+ Assert.assertTrue(waittingForFailOver());
+
+ // return fake queueUserAcls
+ return GetQueueUserAclsInfoResponse
+ .newInstance(createFakeQueueUserACLInfoList());
+ }
+
+ @Override
+ public GetApplicationAttemptReportResponse getApplicationAttemptReport(
+ GetApplicationAttemptReportRequest request) throws YarnException,
+ IOException {
+ resetStartFailoverFlag(true);
+
+ // make sure failover has been triggered
+ Assert.assertTrue(waittingForFailOver());
+
+ // return fake ApplicationAttemptReport
+ return GetApplicationAttemptReportResponse
+ .newInstance(createFakeApplicationAttemptReport());
+ }
+
+ @Override
+ public GetApplicationAttemptsResponse getApplicationAttempts(
+ GetApplicationAttemptsRequest request) throws YarnException,
+ IOException {
+ resetStartFailoverFlag(true);
+
+ // make sure failover has been triggered
+ Assert.assertTrue(waittingForFailOver());
+
+ // return fake ApplicationAttemptReports
+ return GetApplicationAttemptsResponse
+ .newInstance(createFakeApplicationAttemptReports());
+ }
+
+ @Override
+ public GetContainerReportResponse getContainerReport(
+ GetContainerReportRequest request) throws YarnException,
+ IOException {
+ resetStartFailoverFlag(true);
+
+ // make sure failover has been triggered
+ Assert.assertTrue(waittingForFailOver());
+
+ // return fake containerReport
+ return GetContainerReportResponse
+ .newInstance(createFakeContainerReport());
+ }
+
+ @Override
+ public GetContainersResponse getContainers(GetContainersRequest request)
+ throws YarnException, IOException {
+ resetStartFailoverFlag(true);
+
+ // make sure failover has been triggered
+ Assert.assertTrue(waittingForFailOver());
+
+ // return fake ContainerReports
+ return GetContainersResponse.newInstance(createFakeContainerReports());
+ }
+
+ @Override
+ public SubmitApplicationResponse submitApplication(
+ SubmitApplicationRequest request) throws YarnException {
+ resetStartFailoverFlag(true);
+
+ // make sure failover has been triggered
+ Assert.assertTrue(waittingForFailOver());
+
+ return super.submitApplication(request);
+ }
+
+ @Override
+ public KillApplicationResponse forceKillApplication(
+ KillApplicationRequest request) throws YarnException {
+ resetStartFailoverFlag(true);
+
+ // make sure failover has been triggered
+ Assert.assertTrue(waittingForFailOver());
+
+ return KillApplicationResponse.newInstance(true);
+ }
+
+ @Override
+ public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
+ MoveApplicationAcrossQueuesRequest request) throws YarnException {
+ resetStartFailoverFlag(true);
+
+ // make sure failover has been triggered
+ Assert.assertTrue(waittingForFailOver());
+
+ return Records.newRecord(MoveApplicationAcrossQueuesResponse.class);
+ }
+
+ @Override
+ public GetDelegationTokenResponse getDelegationToken(
+ GetDelegationTokenRequest request) throws YarnException {
+ resetStartFailoverFlag(true);
+
+ // make sure failover has been triggered
+ Assert.assertTrue(waittingForFailOver());
+
+ return GetDelegationTokenResponse.newInstance(createFakeToken());
+ }
+
+ @Override
+ public RenewDelegationTokenResponse renewDelegationToken(
+ RenewDelegationTokenRequest request) throws YarnException {
+ resetStartFailoverFlag(true);
+
+ // make sure failover has been triggered
+ Assert.assertTrue(waittingForFailOver());
+
+ return RenewDelegationTokenResponse
+ .newInstance(createNextExpirationTime());
+ }
+
+ @Override
+ public CancelDelegationTokenResponse cancelDelegationToken(
+ CancelDelegationTokenRequest request) throws YarnException {
+ resetStartFailoverFlag(true);
+
+ // make sure failover has been triggered
+ Assert.assertTrue(waittingForFailOver());
+
+ return CancelDelegationTokenResponse.newInstance();
+ }
+ }
+
+ public ApplicationReport createFakeAppReport() {
+ ApplicationId appId = ApplicationId.newInstance(1000l, 1);
+ ApplicationAttemptId attemptId =
+ ApplicationAttemptId.newInstance(appId, 1);
+ // create a fake application report
+ ApplicationReport report =
+ ApplicationReport.newInstance(appId, attemptId, "fakeUser",
+ "fakeQueue", "fakeApplicationName", "localhost", 0, null,
+ YarnApplicationState.FAILED, "fake an application report", "",
+ 1000l, 1200l, FinalApplicationStatus.FAILED, null, "", 50f,
+ "fakeApplicationType", null);
+ return report;
+ }
+
+ public List createFakeAppReports() {
+ List reports = new ArrayList();
+ reports.add(createFakeAppReport());
+ return reports;
+ }
+
+ public ApplicationId createFakeAppId() {
+ return ApplicationId.newInstance(1000l, 1);
+ }
+
+ public ApplicationAttemptId createFakeApplicationAttemptId() {
+ return ApplicationAttemptId.newInstance(createFakeAppId(), 0);
+ }
+
+ public ContainerId createFakeContainerId() {
+ return ContainerId.newInstance(createFakeApplicationAttemptId(), 0);
+ }
+
+ public YarnClusterMetrics createFakeYarnClusterMetrics() {
+ return YarnClusterMetrics.newInstance(1);
+ }
+
+ public List createFakeNodeReports() {
+ NodeId nodeId = NodeId.newInstance("localhost", 0);
+ NodeReport report =
+ NodeReport.newInstance(nodeId, NodeState.RUNNING, "localhost",
+ "rack1", null, null, 4, null, 1000l);
+ List reports = new ArrayList();
+ reports.add(report);
+ return reports;
+ }
+
+ public QueueInfo createFakeQueueInfo() {
+ return QueueInfo.newInstance("root", 100f, 100f, 50f, null,
+ createFakeAppReports(), QueueState.RUNNING);
+ }
+
+ public List createFakeQueueUserACLInfoList() {
+ List queueACL = new ArrayList();
+ queueACL.add(QueueACL.SUBMIT_APPLICATIONS);
+ QueueUserACLInfo info = QueueUserACLInfo.newInstance("root", queueACL);
+ List infos = new ArrayList();
+ infos.add(info);
+ return infos;
+ }
+
+ public ApplicationAttemptReport createFakeApplicationAttemptReport() {
+ return ApplicationAttemptReport.newInstance(
+ createFakeApplicationAttemptId(), "localhost", 0, "", "",
+ YarnApplicationAttemptState.RUNNING, createFakeContainerId());
+ }
+
+ public List
+ createFakeApplicationAttemptReports() {
+ List reports =
+ new ArrayList();
+ reports.add(createFakeApplicationAttemptReport());
+ return reports;
+ }
+
+ public ContainerReport createFakeContainerReport() {
+ return ContainerReport.newInstance(createFakeContainerId(), null,
+ NodeId.newInstance("localhost", 0), null, 1000l, 1200l, "", "", 0,
+ ContainerState.COMPLETE);
+ }
+
+ public List createFakeContainerReports() {
+ List reports =
+ new ArrayList();
+ reports.add(createFakeContainerReport());
+ return reports;
+ }
+
+ public Token createFakeToken() {
+ String identifier = "fake Token";
+ String password = "fake token passwd";
+ Token token = Token.newInstance(
+ identifier.getBytes(), " ", password.getBytes(), " ");
+ return token;
+ }
+
+ public long createNextExpirationTime() {
+ return "fake Token".getBytes().length;
+ }
+
+ private class CustomedResourceTrackerService extends
+ ResourceTrackerService {
+ public CustomedResourceTrackerService(RMContext rmContext,
+ NodesListManager nodesListManager,
+ NMLivelinessMonitor nmLivelinessMonitor,
+ RMContainerTokenSecretManager containerTokenSecretManager,
+ NMTokenSecretManagerInRM nmTokenSecretManager) {
+ super(rmContext, nodesListManager, nmLivelinessMonitor,
+ containerTokenSecretManager, nmTokenSecretManager);
+ }
+
+ @Override
+ public RegisterNodeManagerResponse registerNodeManager(
+ RegisterNodeManagerRequest request) throws YarnException,
+ IOException {
+ resetStartFailoverFlag(true);
+ // make sure failover has been triggered
+ Assert.assertTrue(waittingForFailOver());
+ return super.registerNodeManager(request);
+ }
+
+ @Override
+ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
+ throws YarnException, IOException {
+ resetStartFailoverFlag(true);
+ // make sure failover has been triggered
+ Assert.assertTrue(waittingForFailOver());
+ return super.nodeHeartbeat(request);
+ }
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationClientProtocolOnHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationClientProtocolOnHA.java
new file mode 100644
index 00000000000..2aa6cc639bc
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationClientProtocolOnHA.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerReport;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestApplicationClientProtocolOnHA extends ProtocolHATestBase {
+ private YarnClient client = null;
+
+ @Before
+ public void initiate() throws Exception {
+ startHACluster(1, true, false);
+ Configuration conf = new YarnConfiguration(this.conf);
+ client = createAndStartYarnClient(conf);
+ }
+
+ @After
+ public void shutDown() {
+ if (client != null) {
+ client.stop();
+ }
+ }
+
+ @Test(timeout = 15000)
+ public void testGetApplicationReportOnHA() throws Exception {
+ ApplicationReport report =
+ client.getApplicationReport(cluster.createFakeAppId());
+ Assert.assertTrue(report != null);
+ Assert.assertEquals(cluster.createFakeAppReport(), report);
+ }
+
+ @Test(timeout = 15000)
+ public void testGetNewApplicationOnHA() throws Exception {
+ ApplicationId appId =
+ client.createApplication().getApplicationSubmissionContext()
+ .getApplicationId();
+ Assert.assertTrue(appId != null);
+ Assert.assertEquals(cluster.createFakeAppId(), appId);
+ }
+
+ @Test(timeout = 15000)
+ public void testGetClusterMetricsOnHA() throws Exception {
+ YarnClusterMetrics clusterMetrics =
+ client.getYarnClusterMetrics();
+ Assert.assertTrue(clusterMetrics != null);
+ Assert.assertEquals(cluster.createFakeYarnClusterMetrics(),
+ clusterMetrics);
+ }
+
+ @Test(timeout = 15000)
+ public void testGetApplicationsOnHA() throws Exception {
+ List reports =
+ client.getApplications();
+ Assert.assertTrue(reports != null && !reports.isEmpty());
+ Assert.assertEquals(cluster.createFakeAppReports(),
+ reports);
+ }
+
+ @Test(timeout = 15000)
+ public void testGetClusterNodesOnHA() throws Exception {
+ List reports = client.getNodeReports(NodeState.RUNNING);
+ Assert.assertTrue(reports != null && !reports.isEmpty());
+ Assert.assertEquals(cluster.createFakeNodeReports(),
+ reports);
+ }
+
+ @Test(timeout = 15000)
+ public void testGetQueueInfoOnHA() throws Exception {
+ QueueInfo queueInfo = client.getQueueInfo("root");
+ Assert.assertTrue(queueInfo != null);
+ Assert.assertEquals(cluster.createFakeQueueInfo(),
+ queueInfo);
+ }
+
+ @Test(timeout = 15000)
+ public void testGetQueueUserAclsOnHA() throws Exception {
+ List queueUserAclsList = client.getQueueAclsInfo();
+ Assert.assertTrue(queueUserAclsList != null
+ && !queueUserAclsList.isEmpty());
+ Assert.assertEquals(cluster.createFakeQueueUserACLInfoList(),
+ queueUserAclsList);
+ }
+
+ @Test(timeout = 15000)
+ public void testGetApplicationAttemptReportOnHA() throws Exception {
+ ApplicationAttemptReport report =
+ client.getApplicationAttemptReport(cluster
+ .createFakeApplicationAttemptId());
+ Assert.assertTrue(report != null);
+ Assert.assertEquals(cluster.createFakeApplicationAttemptReport(), report);
+ }
+
+ @Test(timeout = 15000)
+ public void testGetApplicationAttemptsOnHA() throws Exception {
+ List reports =
+ client.getApplicationAttempts(cluster.createFakeAppId());
+ Assert.assertTrue(reports != null && !reports.isEmpty());
+ Assert.assertEquals(cluster.createFakeApplicationAttemptReports(),
+ reports);
+ }
+
+ @Test(timeout = 15000)
+ public void testGetContainerReportOnHA() throws Exception {
+ ContainerReport report =
+ client.getContainerReport(cluster.createFakeContainerId());
+ Assert.assertTrue(report != null);
+ Assert.assertEquals(cluster.createFakeContainerReport(), report);
+ }
+
+ @Test(timeout = 15000)
+ public void testGetContainersOnHA() throws Exception {
+ List reports =
+ client.getContainers(cluster.createFakeApplicationAttemptId());
+ Assert.assertTrue(reports != null && !reports.isEmpty());
+ Assert.assertEquals(cluster.createFakeContainerReports(),
+ reports);
+ }
+
+ @Test(timeout = 15000)
+ public void testSubmitApplicationOnHA() throws Exception {
+ ApplicationSubmissionContext appContext =
+ Records.newRecord(ApplicationSubmissionContext.class);
+ appContext.setApplicationId(cluster.createFakeAppId());
+ ContainerLaunchContext amContainer =
+ Records.newRecord(ContainerLaunchContext.class);
+ appContext.setAMContainerSpec(amContainer);
+ Resource capability = Records.newRecord(Resource.class);
+ capability.setMemory(10);
+ capability.setVirtualCores(1);
+ appContext.setResource(capability);
+ ApplicationId appId = client.submitApplication(appContext);
+ Assert.assertTrue(getActiveRM().getRMContext().getRMApps()
+ .containsKey(appId));
+ }
+
+ @Test(timeout = 15000)
+ public void testMoveApplicationAcrossQueuesOnHA() throws Exception{
+ client.moveApplicationAcrossQueues(cluster.createFakeAppId(), "root");
+ }
+
+ @Test(timeout = 15000)
+ public void testForceKillApplicationOnHA() throws Exception {
+ client.killApplication(cluster.createFakeAppId());
+ }
+
+ @Test(timeout = 15000)
+ public void testGetDelegationTokenOnHA() throws Exception {
+ Token token = client.getRMDelegationToken(new Text(" "));
+ Assert.assertEquals(token, cluster.createFakeToken());
+ }
+
+ @Test(timeout = 15000)
+ public void testRenewDelegationTokenOnHA() throws Exception {
+ RenewDelegationTokenRequest request =
+ RenewDelegationTokenRequest.newInstance(cluster.createFakeToken());
+ long newExpirationTime =
+ ClientRMProxy.createRMProxy(this.conf, ApplicationClientProtocol.class)
+ .renewDelegationToken(request).getNextExpirationTime();
+ Assert.assertEquals(newExpirationTime, cluster.createNextExpirationTime());
+ }
+
+ @Test(timeout = 15000)
+ public void testCancelDelegationTokenOnHA() throws Exception {
+ CancelDelegationTokenRequest request =
+ CancelDelegationTokenRequest.newInstance(cluster.createFakeToken());
+ ClientRMProxy.createRMProxy(this.conf, ApplicationClientProtocol.class)
+ .cancelDelegationToken(request);
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java
new file mode 100644
index 00000000000..f2d8bc283d2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java
@@ -0,0 +1,91 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.client;
+
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.api.ResourceTracker;
+import org.apache.hadoop.yarn.server.api.ServerRMProxy;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.util.YarnVersionInfo;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestResourceTrackerOnHA extends ProtocolHATestBase{
+
+ private ResourceTracker resourceTracker = null;
+
+ @Before
+ public void initiate() throws Exception {
+ startHACluster(0, false, true);
+ this.resourceTracker = getRMClient();
+ }
+
+ @After
+ public void shutDown() {
+ if(this.resourceTracker != null) {
+ RPC.stopProxy(this.resourceTracker);
+ }
+ }
+
+ @Test(timeout = 15000)
+ public void testResourceTrackerOnHA() throws Exception {
+ NodeId nodeId = NodeId.newInstance("localhost", 0);
+ Resource resource = Resource.newInstance(2048, 4);
+
+ // make sure registerNodeManager works when failover happens
+ RegisterNodeManagerRequest request =
+ RegisterNodeManagerRequest.newInstance(nodeId, 0, resource,
+ YarnVersionInfo.getVersion(), null);
+ resourceTracker.registerNodeManager(request);
+ Assert.assertTrue(waitForNodeManagerToConnect(10000, nodeId));
+
+ // restart the failover thread, and make sure nodeHeartbeat works
+ failoverThread = createAndStartFailoverThread();
+ NodeStatus status =
+ NodeStatus.newInstance(NodeId.newInstance("localhost", 0), 0, null,
+ null, null);
+ NodeHeartbeatRequest request2 =
+ NodeHeartbeatRequest.newInstance(status, null, null);
+ resourceTracker.nodeHeartbeat(request2);
+ }
+
+ private ResourceTracker getRMClient() throws IOException {
+ return ServerRMProxy.createRMProxy(this.conf, ResourceTracker.class);
+ }
+
+ private boolean waitForNodeManagerToConnect(int timeout, NodeId nodeId)
+ throws Exception {
+ for (int i = 0; i < timeout / 100; i++) {
+ if (getActiveRM().getRMContext().getRMNodes().containsKey(nodeId)) {
+ return true;
+ }
+ Thread.sleep(100);
+ }
+ return false;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ResourceTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ResourceTracker.java
index 56cc3179e4b..ad8a6256a11 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ResourceTracker.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ResourceTracker.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.server.api;
import java.io.IOException;
+import org.apache.hadoop.io.retry.AtMostOnce;
+import org.apache.hadoop.io.retry.Idempotent;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
@@ -27,10 +29,12 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResp
public interface ResourceTracker {
+ @Idempotent
public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request) throws YarnException,
IOException;
+ @AtMostOnce
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
throws YarnException, IOException;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
index 61dcd975503..f2e7edba7aa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
@@ -504,16 +504,11 @@ public class ClientRMService extends AbstractService implements
throw RPCUtil.getRemoteException(ie);
}
- // Though duplication will checked again when app is put into rmContext,
- // but it is good to fail the invalid submission as early as possible.
+ // Check whether app has already been put into rmContext,
+ // If it is, simply return the response
if (rmContext.getRMApps().get(applicationId) != null) {
- String message = "Application with id " + applicationId +
- " is already present! Cannot add a duplicate!";
- LOG.warn(message);
- RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
- message, "ClientRMService", "Exception in submitting application",
- applicationId);
- throw RPCUtil.getRemoteException(message);
+ LOG.info("This is an earlier submitted application: " + applicationId);
+ return SubmitApplicationResponse.newInstance();
}
if (submissionContext.getQueue() == null) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index 31e3767bba3..f4f2e209ed2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -718,7 +718,7 @@ public class RMAppImpl implements RMApp, Recoverable {
}
// TODO: Write out change to state store (YARN-1558)
-
+ // Also take care of RM failover
moveEvent.getResult().set(null);
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
index 852a0e11867..13ab17cf083 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
@@ -589,10 +589,8 @@ public class TestClientRMService {
// duplicate appId
try {
rmService.submitApplication(submitRequest2);
- Assert.fail("Exception is expected.");
} catch (YarnException e) {
- Assert.assertTrue("The thrown exception is not expected.",
- e.getMessage().contains("Cannot add a duplicate!"));
+ Assert.fail("Exception is not expected.");
}
GetApplicationsRequest getAllAppsRequest =
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestSubmitApplicationWithRMHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestSubmitApplicationWithRMHA.java
index 484ba8951fb..4cceddd2940 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestSubmitApplicationWithRMHA.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestSubmitApplicationWithRMHA.java
@@ -219,4 +219,89 @@ public class TestSubmitApplicationWithRMHA extends RMHATestBase{
Assert.assertEquals(appReport3.getYarnApplicationState(),
appReport4.getYarnApplicationState());
}
+
+ // There are two scenarios when RM failover happens
+ // during SubmitApplication Call:
+ // 1) RMStateStore already saved the ApplicationState when failover happens
+ // 2) RMStateStore did not save the ApplicationState when failover happens
+ @Test (timeout = 5000)
+ public void
+ testHandleRMHADuringSubmitApplicationCallWithSavedApplicationState()
+ throws Exception {
+ // Test scenario 1 when RM failover happens
+ // druing SubmitApplication Call:
+ // RMStateStore already saved the ApplicationState when failover happens
+ startRMs();
+
+ // Submit Application
+ // After submission, the applicationState will be saved in RMStateStore.
+ RMApp app0 = rm1.submitApp(200);
+
+ // Do the failover
+ explicitFailover();
+
+ // Since the applicationState has already been saved in RMStateStore
+ // before failover happens, the current active rm can load the previous
+ // applicationState.
+ // This RMApp should exist in the RMContext of current active RM
+ Assert.assertTrue(rm2.getRMContext().getRMApps()
+ .containsKey(app0.getApplicationId()));
+
+ // When we re-submit the application with same applicationId, it will
+ // check whether this application has been exist. If yes, just simply
+ // return submitApplicationResponse.
+ RMApp app1 =
+ rm2.submitApp(200, "", UserGroupInformation
+ .getCurrentUser().getShortUserName(), null, false, null,
+ configuration.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+ YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null,
+ false, false, true, app0.getApplicationId());
+
+ Assert.assertEquals(app1.getApplicationId(), app0.getApplicationId());
+ }
+
+ @Test (timeout = 5000)
+ public void
+ testHandleRMHADuringSubmitApplicationCallWithoutSavedApplicationState()
+ throws Exception {
+ // Test scenario 2 when RM failover happens
+ // during SubmitApplication Call:
+ // RMStateStore did not save the ApplicationState when failover happens.
+ // Using customized RMAppManager.
+ startRMsWithCustomizedRMAppManager();
+
+ // Submit Application
+ // After submission, the applicationState will
+ // not be saved in RMStateStore
+ RMApp app0 =
+ rm1.submitApp(200, "", UserGroupInformation
+ .getCurrentUser().getShortUserName(), null, false, null,
+ configuration.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+ YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null,
+ false, false);
+
+ // Do the failover
+ explicitFailover();
+
+ // When failover happens, the RMStateStore has not saved applicationState.
+ // The applicationState of this RMApp is lost.
+ // We should not find the RMApp in the RMContext of current active rm.
+ Assert.assertFalse(rm2.getRMContext().getRMApps()
+ .containsKey(app0.getApplicationId()));
+
+ // Submit the application with previous ApplicationId to current active RM
+ // This will mimic the similar behavior of ApplicationClientProtocol#
+ // submitApplication() when failover happens during the submission process
+ // because the submitApplication api is marked as idempotent
+ RMApp app1 =
+ rm2.submitApp(200, "", UserGroupInformation
+ .getCurrentUser().getShortUserName(), null, false, null,
+ configuration.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+ YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null,
+ false, false, true, app0.getApplicationId());
+
+ verifySubmitApp(rm2, app1, app0.getApplicationId());
+ Assert.assertTrue(rm2.getRMContext().getRMApps()
+ .containsKey(app0.getApplicationId()));
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
index 0adbf65d603..8632815f33f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
@@ -25,7 +25,6 @@ import java.net.UnknownHostException;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -243,12 +242,7 @@ public class MiniYARNCluster extends CompositeService {
}
for (int i = 0; i < resourceManagers.length; i++) {
- resourceManagers[i] = new ResourceManager() {
- @Override
- protected void doSecureLogin() throws IOException {
- // Don't try to login using keytab in the testcases.
- }
- };
+ resourceManagers[i] = createResourceManager();
if (!useFixedPorts) {
if (HAUtil.isHAEnabled(conf)) {
setHARMConfiguration(i, conf);
@@ -676,7 +670,7 @@ public class MiniYARNCluster extends CompositeService {
}
return false;
}
-
+
private class ApplicationHistoryServerWrapper extends AbstractService {
public ApplicationHistoryServerWrapper() {
super(ApplicationHistoryServerWrapper.class.getName());
@@ -736,4 +730,13 @@ public class MiniYARNCluster extends CompositeService {
public ApplicationHistoryServer getApplicationHistoryServer() {
return this.appHistoryServer;
}
+
+ protected ResourceManager createResourceManager() {
+ return new ResourceManager(){
+ @Override
+ protected void doSecureLogin() throws IOException {
+ // Don't try to login using keytab in the testcases.
+ }
+ };
+ }
}