diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 53dea295163..f28bf085232 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -565,6 +565,11 @@ Release 2.4.0 - UNRELEASED YARN-1867. Fixed a bug in ResourceManager that was causing invalid ACL checks in the web-services after fail-over. (Vinod Kumar Vavilapalli) + YARN-1521. Mark Idempotent/AtMostOnce annotations to the APIs in + ApplicationClientProtcol, ResourceManagerAdministrationProtocol and + ResourceTrackerProtocol so that they work in HA scenario. (Xuan Gong + via jianhe) + Release 2.3.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java index 8db91517bea..e449c1ee3f2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java @@ -104,6 +104,7 @@ public interface ApplicationClientProtocol { */ @Public @Stable + @Idempotent public GetNewApplicationResponse getNewApplication( GetNewApplicationRequest request) throws YarnException, IOException; @@ -133,6 +134,10 @@ public interface ApplicationClientProtocol { * it encounters the {@link ApplicationNotFoundException} on the * {@link #getApplicationReport(GetApplicationReportRequest)} call.

* + *

During the submission process, it checks whether the application + * already exists. If the application exists, it will simply return + * SubmitApplicationResponse

+ * *

In secure mode,the ResourceManager verifies access to * queues etc. before accepting the application submission.

* @@ -147,6 +152,7 @@ public interface ApplicationClientProtocol { */ @Public @Stable + @Idempotent public SubmitApplicationResponse submitApplication( SubmitApplicationRequest request) throws YarnException, IOException; @@ -173,6 +179,7 @@ public interface ApplicationClientProtocol { */ @Public @Stable + @Idempotent public KillApplicationResponse forceKillApplication( KillApplicationRequest request) throws YarnException, IOException; @@ -231,6 +238,7 @@ public interface ApplicationClientProtocol { */ @Public @Stable + @Idempotent public GetClusterMetricsResponse getClusterMetrics( GetClusterMetricsRequest request) throws YarnException, IOException; @@ -258,6 +266,7 @@ public interface ApplicationClientProtocol { */ @Public @Stable + @Idempotent public GetApplicationsResponse getApplications( GetApplicationsRequest request) throws YarnException, IOException; @@ -277,6 +286,7 @@ public interface ApplicationClientProtocol { */ @Public @Stable + @Idempotent public GetClusterNodesResponse getClusterNodes( GetClusterNodesRequest request) throws YarnException, IOException; @@ -298,6 +308,7 @@ public interface ApplicationClientProtocol { */ @Public @Stable + @Idempotent public GetQueueInfoResponse getQueueInfo( GetQueueInfoRequest request) throws YarnException, IOException; @@ -317,6 +328,7 @@ public interface ApplicationClientProtocol { */ @Public @Stable + @Idempotent public GetQueueUserAclsInfoResponse getQueueUserAcls( GetQueueUserAclsInfoRequest request) throws YarnException, IOException; @@ -335,6 +347,7 @@ public interface ApplicationClientProtocol { */ @Public @Stable + @Idempotent public GetDelegationTokenResponse getDelegationToken( GetDelegationTokenRequest request) throws YarnException, IOException; @@ -349,6 +362,7 @@ public interface ApplicationClientProtocol { */ @Private @Unstable + @Idempotent public RenewDelegationTokenResponse renewDelegationToken( RenewDelegationTokenRequest request) throws YarnException, IOException; @@ -363,6 +377,7 @@ public interface ApplicationClientProtocol { */ @Private @Unstable + @Idempotent public CancelDelegationTokenResponse cancelDelegationToken( CancelDelegationTokenRequest request) throws YarnException, IOException; @@ -377,6 +392,7 @@ public interface ApplicationClientProtocol { */ @Public @Unstable + @Idempotent public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues( MoveApplicationAcrossQueuesRequest request) throws YarnException, IOException; @@ -422,6 +438,7 @@ public interface ApplicationClientProtocol { */ @Public @Unstable + @Idempotent public GetApplicationAttemptReportResponse getApplicationAttemptReport( GetApplicationAttemptReportRequest request) throws YarnException, IOException; @@ -453,6 +470,7 @@ public interface ApplicationClientProtocol { */ @Public @Unstable + @Idempotent public GetApplicationAttemptsResponse getApplicationAttempts( GetApplicationAttemptsRequest request) throws YarnException, IOException; @@ -486,6 +504,7 @@ public interface ApplicationClientProtocol { */ @Public @Unstable + @Idempotent public GetContainerReportResponse getContainerReport( GetContainerReportRequest request) throws YarnException, IOException; @@ -520,6 +539,7 @@ public interface ApplicationClientProtocol { */ @Public @Unstable + @Idempotent public GetContainersResponse getContainers(GetContainersRequest request) throws YarnException, IOException; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java index 26415992ac0..4b777eaac5f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java @@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.io.retry.Idempotent; import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.tools.GetUserMappingsProtocol; import org.apache.hadoop.yarn.api.records.NodeId; @@ -50,16 +51,19 @@ public interface ResourceManagerAdministrationProtocol extends GetUserMappingsPr @Public @Stable + @Idempotent public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request) throws StandbyException, YarnException, IOException; @Public @Stable + @Idempotent public RefreshNodesResponse refreshNodes(RefreshNodesRequest request) throws StandbyException, YarnException, IOException; @Public @Stable + @Idempotent public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfiguration( RefreshSuperUserGroupsConfigurationRequest request) @@ -67,18 +71,21 @@ public interface ResourceManagerAdministrationProtocol extends GetUserMappingsPr @Public @Stable + @Idempotent public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings( RefreshUserToGroupsMappingsRequest request) throws StandbyException, YarnException, IOException; @Public @Stable + @Idempotent public RefreshAdminAclsResponse refreshAdminAcls( RefreshAdminAclsRequest request) throws YarnException, IOException; @Public @Stable + @Idempotent public RefreshServiceAclsResponse refreshServiceAcls( RefreshServiceAclsRequest request) throws YarnException, IOException; @@ -99,6 +106,7 @@ public interface ResourceManagerAdministrationProtocol extends GetUserMappingsPr */ @Public @Evolving + @Idempotent public UpdateNodeResourceResponse updateNodeResource( UpdateNodeResourceRequest request) throws YarnException, IOException; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java new file mode 100644 index 00000000000..4f3cab2c327 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java @@ -0,0 +1,721 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +import junit.framework.Assert; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ha.ClientBaseWithFixes; +import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerReport; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.QueueACL; +import org.apache.hadoop.yarn.api.records.QueueInfo; +import org.apache.hadoop.yarn.api.records.QueueState; +import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.conf.HAUtil; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; +import org.apache.hadoop.yarn.server.resourcemanager.AdminService; +import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService; +import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor; +import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager; +import org.apache.hadoop.yarn.server.resourcemanager.RMAppManager; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; +import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; +import org.apache.hadoop.yarn.util.Records; +import org.junit.After; +import org.junit.Before; + + +public abstract class ProtocolHATestBase extends ClientBaseWithFixes{ + protected static final HAServiceProtocol.StateChangeRequestInfo req = + new HAServiceProtocol.StateChangeRequestInfo( + HAServiceProtocol.RequestSource.REQUEST_BY_USER); + + protected static final String RM1_NODE_ID = "rm1"; + protected static final int RM1_PORT_BASE = 10000; + protected static final String RM2_NODE_ID = "rm2"; + protected static final int RM2_PORT_BASE = 20000; + + protected Configuration conf; + protected MiniYARNClusterForHATesting cluster; + + protected Thread failoverThread = null; + private volatile boolean keepRunning; + + private void setConfForRM(String rmId, String prefix, String value) { + conf.set(HAUtil.addSuffix(prefix, rmId), value); + } + + private void setRpcAddressForRM(String rmId, int base) { + setConfForRM(rmId, YarnConfiguration.RM_ADDRESS, "0.0.0.0:" + + (base + YarnConfiguration.DEFAULT_RM_PORT)); + setConfForRM(rmId, YarnConfiguration.RM_SCHEDULER_ADDRESS, "0.0.0.0:" + + (base + YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT)); + setConfForRM(rmId, YarnConfiguration.RM_ADMIN_ADDRESS, "0.0.0.0:" + + (base + YarnConfiguration.DEFAULT_RM_ADMIN_PORT)); + setConfForRM(rmId, YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, + "0.0.0.0:" + (base + YarnConfiguration + .DEFAULT_RM_RESOURCE_TRACKER_PORT)); + setConfForRM(rmId, YarnConfiguration.RM_WEBAPP_ADDRESS, "0.0.0.0:" + + (base + YarnConfiguration.DEFAULT_RM_WEBAPP_PORT)); + setConfForRM(rmId, YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS, "0.0.0.0:" + + (base + YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_PORT)); + } + + @Before + public void setup() throws IOException { + failoverThread = null; + keepRunning = true; + conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); + conf.setInt(YarnConfiguration.CLIENT_FAILOVER_MAX_ATTEMPTS, 5); + conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID); + setRpcAddressForRM(RM1_NODE_ID, RM1_PORT_BASE); + setRpcAddressForRM(RM2_NODE_ID, RM2_PORT_BASE); + + conf.setLong(YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS, 100L); + + conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true); + conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_USE_RPC, true); + } + + @After + public void teardown() throws Exception { + keepRunning = false; + if (failoverThread != null) { + failoverThread.interrupt(); + try { + failoverThread.join(); + } catch (InterruptedException ex) { + LOG.error("Error joining with failover thread", ex); + } + } + cluster.stop(); + } + + protected AdminService getAdminService(int index) { + return cluster.getResourceManager(index).getRMContext() + .getRMAdminService(); + } + + protected void explicitFailover() throws IOException { + int activeRMIndex = cluster.getActiveRMIndex(); + int newActiveRMIndex = (activeRMIndex + 1) % 2; + getAdminService(activeRMIndex).transitionToStandby(req); + getAdminService(newActiveRMIndex).transitionToActive(req); + assertEquals("Failover failed", newActiveRMIndex, + cluster.getActiveRMIndex()); + } + + protected YarnClient createAndStartYarnClient(Configuration conf) { + Configuration configuration = new YarnConfiguration(conf); + YarnClient client = YarnClient.createYarnClient(); + client.init(configuration); + client.start(); + return client; + } + + protected void verifyConnections() throws InterruptedException, + YarnException { + assertTrue("NMs failed to connect to the RM", + cluster.waitForNodeManagersToConnect(20000)); + verifyClientConnection(); + } + + protected void verifyClientConnection() { + int numRetries = 3; + while(numRetries-- > 0) { + Configuration conf = new YarnConfiguration(this.conf); + YarnClient client = createAndStartYarnClient(conf); + try { + Thread.sleep(100); + client.getApplications(); + return; + } catch (Exception e) { + LOG.error(e.getMessage()); + } finally { + client.stop(); + } + } + fail("Client couldn't connect to the Active RM"); + } + + protected Thread createAndStartFailoverThread() { + Thread failoverThread = new Thread() { + public void run() { + keepRunning = true; + while (keepRunning) { + if (cluster.getStartFailoverFlag()) { + try { + explicitFailover(); + keepRunning = false; + cluster.resetFailoverTriggeredFlag(true); + } catch (Exception e) { + // Do Nothing + } finally { + keepRunning = false; + } + } + try { + Thread.sleep(50); + } catch (InterruptedException e) { + // DO NOTHING + } + } + } + }; + failoverThread.start(); + return failoverThread; + } + + protected void startHACluster(int numOfNMs, boolean overrideClientRMService, + boolean overrideRTS) throws Exception { + conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); + cluster = + new MiniYARNClusterForHATesting(TestRMFailover.class.getName(), 2, + numOfNMs, 1, 1, false, overrideClientRMService, overrideRTS); + cluster.resetStartFailoverFlag(false); + cluster.init(conf); + cluster.start(); + getAdminService(0).transitionToActive(req); + assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex()); + verifyConnections(); + + // Do the failover + explicitFailover(); + verifyConnections(); + + failoverThread = createAndStartFailoverThread(); + + } + + protected ResourceManager getActiveRM() { + return cluster.getResourceManager(cluster.getActiveRMIndex()); + } + + public class MiniYARNClusterForHATesting extends MiniYARNCluster { + + private boolean overrideClientRMService; + private boolean overrideRTS; + private final AtomicBoolean startFailover = new AtomicBoolean(false); + private final AtomicBoolean failoverTriggered = new AtomicBoolean(false); + + public MiniYARNClusterForHATesting(String testName, + int numResourceManagers, int numNodeManagers, int numLocalDirs, + int numLogDirs, boolean enableAHS, boolean overrideClientRMService, + boolean overrideRTS) { + super(testName, numResourceManagers, numNodeManagers, numLocalDirs, + numLogDirs, enableAHS); + this.overrideClientRMService = overrideClientRMService; + this.overrideRTS = overrideRTS; + } + + public boolean getStartFailoverFlag() { + return startFailover.get(); + } + + public void resetStartFailoverFlag(boolean flag) { + startFailover.set(flag); + } + + public void resetFailoverTriggeredFlag(boolean flag) { + failoverTriggered.set(flag); + } + + private boolean waittingForFailOver() { + int maximumWaittingTime = 50; + int count = 0; + while (!failoverTriggered.get() && count >= maximumWaittingTime) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + // DO NOTHING + } + count++; + } + if (count >= maximumWaittingTime) { + return false; + } + return true; + } + + @Override + protected ResourceManager createResourceManager() { + return new ResourceManager() { + @Override + protected void doSecureLogin() throws IOException { + // Don't try to login using keytab in the testcases. + } + @Override + protected ClientRMService createClientRMService() { + if (overrideClientRMService) { + return new CustomedClientRMService(this.rmContext, this.scheduler, + this.rmAppManager, this.applicationACLsManager, + this.queueACLsManager, + this.rmContext.getRMDelegationTokenSecretManager()); + } + return super.createClientRMService(); + } + @Override + protected ResourceTrackerService createResourceTrackerService() { + if (overrideRTS) { + return new CustomedResourceTrackerService(this.rmContext, + this.nodesListManager, this.nmLivelinessMonitor, + this.rmContext.getContainerTokenSecretManager(), + this.rmContext.getNMTokenSecretManager()); + } + return super.createResourceTrackerService(); + } + }; + } + + private class CustomedClientRMService extends ClientRMService { + public CustomedClientRMService(RMContext rmContext, + YarnScheduler scheduler, RMAppManager rmAppManager, + ApplicationACLsManager applicationACLsManager, + QueueACLsManager queueACLsManager, + RMDelegationTokenSecretManager rmDTSecretManager) { + super(rmContext, scheduler, rmAppManager, applicationACLsManager, + queueACLsManager, rmDTSecretManager); + } + + @Override + public GetNewApplicationResponse getNewApplication( + GetNewApplicationRequest request) throws YarnException { + resetStartFailoverFlag(true); + + // make sure failover has been triggered + Assert.assertTrue(waittingForFailOver()); + + // create the GetNewApplicationResponse with fake applicationId + GetNewApplicationResponse response = + GetNewApplicationResponse.newInstance( + createFakeAppId(), null, null); + return response; + } + + @Override + public GetApplicationReportResponse getApplicationReport( + GetApplicationReportRequest request) throws YarnException { + resetStartFailoverFlag(true); + + // make sure failover has been triggered + Assert.assertTrue(waittingForFailOver()); + + // create a fake application report + ApplicationReport report = createFakeAppReport(); + GetApplicationReportResponse response = + GetApplicationReportResponse.newInstance(report); + return response; + } + + @Override + public GetClusterMetricsResponse getClusterMetrics( + GetClusterMetricsRequest request) throws YarnException { + resetStartFailoverFlag(true); + + // make sure failover has been triggered + Assert.assertTrue(waittingForFailOver()); + + // create GetClusterMetricsResponse with fake YarnClusterMetrics + GetClusterMetricsResponse response = + GetClusterMetricsResponse.newInstance( + createFakeYarnClusterMetrics()); + return response; + } + + @Override + public GetApplicationsResponse getApplications( + GetApplicationsRequest request) throws YarnException { + resetStartFailoverFlag(true); + + // make sure failover has been triggered + Assert.assertTrue(waittingForFailOver()); + + // create GetApplicationsResponse with fake applicationList + GetApplicationsResponse response = + GetApplicationsResponse.newInstance(createFakeAppReports()); + return response; + } + + @Override + public GetClusterNodesResponse getClusterNodes( + GetClusterNodesRequest request) + throws YarnException { + resetStartFailoverFlag(true); + + // make sure failover has been triggered + Assert.assertTrue(waittingForFailOver()); + + // create GetClusterNodesResponse with fake ClusterNodeLists + GetClusterNodesResponse response = + GetClusterNodesResponse.newInstance(createFakeNodeReports()); + return response; + } + + @Override + public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request) + throws YarnException { + resetStartFailoverFlag(true); + + // make sure failover has been triggered + Assert.assertTrue(waittingForFailOver()); + + // return fake QueueInfo + return GetQueueInfoResponse.newInstance(createFakeQueueInfo()); + } + + @Override + public GetQueueUserAclsInfoResponse getQueueUserAcls( + GetQueueUserAclsInfoRequest request) throws YarnException { + resetStartFailoverFlag(true); + + // make sure failover has been triggered + Assert.assertTrue(waittingForFailOver()); + + // return fake queueUserAcls + return GetQueueUserAclsInfoResponse + .newInstance(createFakeQueueUserACLInfoList()); + } + + @Override + public GetApplicationAttemptReportResponse getApplicationAttemptReport( + GetApplicationAttemptReportRequest request) throws YarnException, + IOException { + resetStartFailoverFlag(true); + + // make sure failover has been triggered + Assert.assertTrue(waittingForFailOver()); + + // return fake ApplicationAttemptReport + return GetApplicationAttemptReportResponse + .newInstance(createFakeApplicationAttemptReport()); + } + + @Override + public GetApplicationAttemptsResponse getApplicationAttempts( + GetApplicationAttemptsRequest request) throws YarnException, + IOException { + resetStartFailoverFlag(true); + + // make sure failover has been triggered + Assert.assertTrue(waittingForFailOver()); + + // return fake ApplicationAttemptReports + return GetApplicationAttemptsResponse + .newInstance(createFakeApplicationAttemptReports()); + } + + @Override + public GetContainerReportResponse getContainerReport( + GetContainerReportRequest request) throws YarnException, + IOException { + resetStartFailoverFlag(true); + + // make sure failover has been triggered + Assert.assertTrue(waittingForFailOver()); + + // return fake containerReport + return GetContainerReportResponse + .newInstance(createFakeContainerReport()); + } + + @Override + public GetContainersResponse getContainers(GetContainersRequest request) + throws YarnException, IOException { + resetStartFailoverFlag(true); + + // make sure failover has been triggered + Assert.assertTrue(waittingForFailOver()); + + // return fake ContainerReports + return GetContainersResponse.newInstance(createFakeContainerReports()); + } + + @Override + public SubmitApplicationResponse submitApplication( + SubmitApplicationRequest request) throws YarnException { + resetStartFailoverFlag(true); + + // make sure failover has been triggered + Assert.assertTrue(waittingForFailOver()); + + return super.submitApplication(request); + } + + @Override + public KillApplicationResponse forceKillApplication( + KillApplicationRequest request) throws YarnException { + resetStartFailoverFlag(true); + + // make sure failover has been triggered + Assert.assertTrue(waittingForFailOver()); + + return KillApplicationResponse.newInstance(true); + } + + @Override + public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues( + MoveApplicationAcrossQueuesRequest request) throws YarnException { + resetStartFailoverFlag(true); + + // make sure failover has been triggered + Assert.assertTrue(waittingForFailOver()); + + return Records.newRecord(MoveApplicationAcrossQueuesResponse.class); + } + + @Override + public GetDelegationTokenResponse getDelegationToken( + GetDelegationTokenRequest request) throws YarnException { + resetStartFailoverFlag(true); + + // make sure failover has been triggered + Assert.assertTrue(waittingForFailOver()); + + return GetDelegationTokenResponse.newInstance(createFakeToken()); + } + + @Override + public RenewDelegationTokenResponse renewDelegationToken( + RenewDelegationTokenRequest request) throws YarnException { + resetStartFailoverFlag(true); + + // make sure failover has been triggered + Assert.assertTrue(waittingForFailOver()); + + return RenewDelegationTokenResponse + .newInstance(createNextExpirationTime()); + } + + @Override + public CancelDelegationTokenResponse cancelDelegationToken( + CancelDelegationTokenRequest request) throws YarnException { + resetStartFailoverFlag(true); + + // make sure failover has been triggered + Assert.assertTrue(waittingForFailOver()); + + return CancelDelegationTokenResponse.newInstance(); + } + } + + public ApplicationReport createFakeAppReport() { + ApplicationId appId = ApplicationId.newInstance(1000l, 1); + ApplicationAttemptId attemptId = + ApplicationAttemptId.newInstance(appId, 1); + // create a fake application report + ApplicationReport report = + ApplicationReport.newInstance(appId, attemptId, "fakeUser", + "fakeQueue", "fakeApplicationName", "localhost", 0, null, + YarnApplicationState.FAILED, "fake an application report", "", + 1000l, 1200l, FinalApplicationStatus.FAILED, null, "", 50f, + "fakeApplicationType", null); + return report; + } + + public List createFakeAppReports() { + List reports = new ArrayList(); + reports.add(createFakeAppReport()); + return reports; + } + + public ApplicationId createFakeAppId() { + return ApplicationId.newInstance(1000l, 1); + } + + public ApplicationAttemptId createFakeApplicationAttemptId() { + return ApplicationAttemptId.newInstance(createFakeAppId(), 0); + } + + public ContainerId createFakeContainerId() { + return ContainerId.newInstance(createFakeApplicationAttemptId(), 0); + } + + public YarnClusterMetrics createFakeYarnClusterMetrics() { + return YarnClusterMetrics.newInstance(1); + } + + public List createFakeNodeReports() { + NodeId nodeId = NodeId.newInstance("localhost", 0); + NodeReport report = + NodeReport.newInstance(nodeId, NodeState.RUNNING, "localhost", + "rack1", null, null, 4, null, 1000l); + List reports = new ArrayList(); + reports.add(report); + return reports; + } + + public QueueInfo createFakeQueueInfo() { + return QueueInfo.newInstance("root", 100f, 100f, 50f, null, + createFakeAppReports(), QueueState.RUNNING); + } + + public List createFakeQueueUserACLInfoList() { + List queueACL = new ArrayList(); + queueACL.add(QueueACL.SUBMIT_APPLICATIONS); + QueueUserACLInfo info = QueueUserACLInfo.newInstance("root", queueACL); + List infos = new ArrayList(); + infos.add(info); + return infos; + } + + public ApplicationAttemptReport createFakeApplicationAttemptReport() { + return ApplicationAttemptReport.newInstance( + createFakeApplicationAttemptId(), "localhost", 0, "", "", + YarnApplicationAttemptState.RUNNING, createFakeContainerId()); + } + + public List + createFakeApplicationAttemptReports() { + List reports = + new ArrayList(); + reports.add(createFakeApplicationAttemptReport()); + return reports; + } + + public ContainerReport createFakeContainerReport() { + return ContainerReport.newInstance(createFakeContainerId(), null, + NodeId.newInstance("localhost", 0), null, 1000l, 1200l, "", "", 0, + ContainerState.COMPLETE); + } + + public List createFakeContainerReports() { + List reports = + new ArrayList(); + reports.add(createFakeContainerReport()); + return reports; + } + + public Token createFakeToken() { + String identifier = "fake Token"; + String password = "fake token passwd"; + Token token = Token.newInstance( + identifier.getBytes(), " ", password.getBytes(), " "); + return token; + } + + public long createNextExpirationTime() { + return "fake Token".getBytes().length; + } + + private class CustomedResourceTrackerService extends + ResourceTrackerService { + public CustomedResourceTrackerService(RMContext rmContext, + NodesListManager nodesListManager, + NMLivelinessMonitor nmLivelinessMonitor, + RMContainerTokenSecretManager containerTokenSecretManager, + NMTokenSecretManagerInRM nmTokenSecretManager) { + super(rmContext, nodesListManager, nmLivelinessMonitor, + containerTokenSecretManager, nmTokenSecretManager); + } + + @Override + public RegisterNodeManagerResponse registerNodeManager( + RegisterNodeManagerRequest request) throws YarnException, + IOException { + resetStartFailoverFlag(true); + // make sure failover has been triggered + Assert.assertTrue(waittingForFailOver()); + return super.registerNodeManager(request); + } + + @Override + public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) + throws YarnException, IOException { + resetStartFailoverFlag(true); + // make sure failover has been triggered + Assert.assertTrue(waittingForFailOver()); + return super.nodeHeartbeat(request); + } + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationClientProtocolOnHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationClientProtocolOnHA.java new file mode 100644 index 00000000000..2aa6cc639bc --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationClientProtocolOnHA.java @@ -0,0 +1,211 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client; + +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerReport; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.QueueInfo; +import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.util.Records; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestApplicationClientProtocolOnHA extends ProtocolHATestBase { + private YarnClient client = null; + + @Before + public void initiate() throws Exception { + startHACluster(1, true, false); + Configuration conf = new YarnConfiguration(this.conf); + client = createAndStartYarnClient(conf); + } + + @After + public void shutDown() { + if (client != null) { + client.stop(); + } + } + + @Test(timeout = 15000) + public void testGetApplicationReportOnHA() throws Exception { + ApplicationReport report = + client.getApplicationReport(cluster.createFakeAppId()); + Assert.assertTrue(report != null); + Assert.assertEquals(cluster.createFakeAppReport(), report); + } + + @Test(timeout = 15000) + public void testGetNewApplicationOnHA() throws Exception { + ApplicationId appId = + client.createApplication().getApplicationSubmissionContext() + .getApplicationId(); + Assert.assertTrue(appId != null); + Assert.assertEquals(cluster.createFakeAppId(), appId); + } + + @Test(timeout = 15000) + public void testGetClusterMetricsOnHA() throws Exception { + YarnClusterMetrics clusterMetrics = + client.getYarnClusterMetrics(); + Assert.assertTrue(clusterMetrics != null); + Assert.assertEquals(cluster.createFakeYarnClusterMetrics(), + clusterMetrics); + } + + @Test(timeout = 15000) + public void testGetApplicationsOnHA() throws Exception { + List reports = + client.getApplications(); + Assert.assertTrue(reports != null && !reports.isEmpty()); + Assert.assertEquals(cluster.createFakeAppReports(), + reports); + } + + @Test(timeout = 15000) + public void testGetClusterNodesOnHA() throws Exception { + List reports = client.getNodeReports(NodeState.RUNNING); + Assert.assertTrue(reports != null && !reports.isEmpty()); + Assert.assertEquals(cluster.createFakeNodeReports(), + reports); + } + + @Test(timeout = 15000) + public void testGetQueueInfoOnHA() throws Exception { + QueueInfo queueInfo = client.getQueueInfo("root"); + Assert.assertTrue(queueInfo != null); + Assert.assertEquals(cluster.createFakeQueueInfo(), + queueInfo); + } + + @Test(timeout = 15000) + public void testGetQueueUserAclsOnHA() throws Exception { + List queueUserAclsList = client.getQueueAclsInfo(); + Assert.assertTrue(queueUserAclsList != null + && !queueUserAclsList.isEmpty()); + Assert.assertEquals(cluster.createFakeQueueUserACLInfoList(), + queueUserAclsList); + } + + @Test(timeout = 15000) + public void testGetApplicationAttemptReportOnHA() throws Exception { + ApplicationAttemptReport report = + client.getApplicationAttemptReport(cluster + .createFakeApplicationAttemptId()); + Assert.assertTrue(report != null); + Assert.assertEquals(cluster.createFakeApplicationAttemptReport(), report); + } + + @Test(timeout = 15000) + public void testGetApplicationAttemptsOnHA() throws Exception { + List reports = + client.getApplicationAttempts(cluster.createFakeAppId()); + Assert.assertTrue(reports != null && !reports.isEmpty()); + Assert.assertEquals(cluster.createFakeApplicationAttemptReports(), + reports); + } + + @Test(timeout = 15000) + public void testGetContainerReportOnHA() throws Exception { + ContainerReport report = + client.getContainerReport(cluster.createFakeContainerId()); + Assert.assertTrue(report != null); + Assert.assertEquals(cluster.createFakeContainerReport(), report); + } + + @Test(timeout = 15000) + public void testGetContainersOnHA() throws Exception { + List reports = + client.getContainers(cluster.createFakeApplicationAttemptId()); + Assert.assertTrue(reports != null && !reports.isEmpty()); + Assert.assertEquals(cluster.createFakeContainerReports(), + reports); + } + + @Test(timeout = 15000) + public void testSubmitApplicationOnHA() throws Exception { + ApplicationSubmissionContext appContext = + Records.newRecord(ApplicationSubmissionContext.class); + appContext.setApplicationId(cluster.createFakeAppId()); + ContainerLaunchContext amContainer = + Records.newRecord(ContainerLaunchContext.class); + appContext.setAMContainerSpec(amContainer); + Resource capability = Records.newRecord(Resource.class); + capability.setMemory(10); + capability.setVirtualCores(1); + appContext.setResource(capability); + ApplicationId appId = client.submitApplication(appContext); + Assert.assertTrue(getActiveRM().getRMContext().getRMApps() + .containsKey(appId)); + } + + @Test(timeout = 15000) + public void testMoveApplicationAcrossQueuesOnHA() throws Exception{ + client.moveApplicationAcrossQueues(cluster.createFakeAppId(), "root"); + } + + @Test(timeout = 15000) + public void testForceKillApplicationOnHA() throws Exception { + client.killApplication(cluster.createFakeAppId()); + } + + @Test(timeout = 15000) + public void testGetDelegationTokenOnHA() throws Exception { + Token token = client.getRMDelegationToken(new Text(" ")); + Assert.assertEquals(token, cluster.createFakeToken()); + } + + @Test(timeout = 15000) + public void testRenewDelegationTokenOnHA() throws Exception { + RenewDelegationTokenRequest request = + RenewDelegationTokenRequest.newInstance(cluster.createFakeToken()); + long newExpirationTime = + ClientRMProxy.createRMProxy(this.conf, ApplicationClientProtocol.class) + .renewDelegationToken(request).getNextExpirationTime(); + Assert.assertEquals(newExpirationTime, cluster.createNextExpirationTime()); + } + + @Test(timeout = 15000) + public void testCancelDelegationTokenOnHA() throws Exception { + CancelDelegationTokenRequest request = + CancelDelegationTokenRequest.newInstance(cluster.createFakeToken()); + ClientRMProxy.createRMProxy(this.conf, ApplicationClientProtocol.class) + .cancelDelegationToken(request); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java new file mode 100644 index 00000000000..f2d8bc283d2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java @@ -0,0 +1,91 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.client; + +import java.io.IOException; + +import junit.framework.Assert; + +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.api.ResourceTracker; +import org.apache.hadoop.yarn.server.api.ServerRMProxy; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.util.YarnVersionInfo; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestResourceTrackerOnHA extends ProtocolHATestBase{ + + private ResourceTracker resourceTracker = null; + + @Before + public void initiate() throws Exception { + startHACluster(0, false, true); + this.resourceTracker = getRMClient(); + } + + @After + public void shutDown() { + if(this.resourceTracker != null) { + RPC.stopProxy(this.resourceTracker); + } + } + + @Test(timeout = 15000) + public void testResourceTrackerOnHA() throws Exception { + NodeId nodeId = NodeId.newInstance("localhost", 0); + Resource resource = Resource.newInstance(2048, 4); + + // make sure registerNodeManager works when failover happens + RegisterNodeManagerRequest request = + RegisterNodeManagerRequest.newInstance(nodeId, 0, resource, + YarnVersionInfo.getVersion(), null); + resourceTracker.registerNodeManager(request); + Assert.assertTrue(waitForNodeManagerToConnect(10000, nodeId)); + + // restart the failover thread, and make sure nodeHeartbeat works + failoverThread = createAndStartFailoverThread(); + NodeStatus status = + NodeStatus.newInstance(NodeId.newInstance("localhost", 0), 0, null, + null, null); + NodeHeartbeatRequest request2 = + NodeHeartbeatRequest.newInstance(status, null, null); + resourceTracker.nodeHeartbeat(request2); + } + + private ResourceTracker getRMClient() throws IOException { + return ServerRMProxy.createRMProxy(this.conf, ResourceTracker.class); + } + + private boolean waitForNodeManagerToConnect(int timeout, NodeId nodeId) + throws Exception { + for (int i = 0; i < timeout / 100; i++) { + if (getActiveRM().getRMContext().getRMNodes().containsKey(nodeId)) { + return true; + } + Thread.sleep(100); + } + return false; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ResourceTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ResourceTracker.java index 56cc3179e4b..ad8a6256a11 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ResourceTracker.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ResourceTracker.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.server.api; import java.io.IOException; +import org.apache.hadoop.io.retry.AtMostOnce; +import org.apache.hadoop.io.retry.Idempotent; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; @@ -27,10 +29,12 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResp public interface ResourceTracker { + @Idempotent public RegisterNodeManagerResponse registerNodeManager( RegisterNodeManagerRequest request) throws YarnException, IOException; + @AtMostOnce public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) throws YarnException, IOException; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index 61dcd975503..f2e7edba7aa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -504,16 +504,11 @@ public class ClientRMService extends AbstractService implements throw RPCUtil.getRemoteException(ie); } - // Though duplication will checked again when app is put into rmContext, - // but it is good to fail the invalid submission as early as possible. + // Check whether app has already been put into rmContext, + // If it is, simply return the response if (rmContext.getRMApps().get(applicationId) != null) { - String message = "Application with id " + applicationId + - " is already present! Cannot add a duplicate!"; - LOG.warn(message); - RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST, - message, "ClientRMService", "Exception in submitting application", - applicationId); - throw RPCUtil.getRemoteException(message); + LOG.info("This is an earlier submitted application: " + applicationId); + return SubmitApplicationResponse.newInstance(); } if (submissionContext.getQueue() == null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 31e3767bba3..f4f2e209ed2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -718,7 +718,7 @@ public class RMAppImpl implements RMApp, Recoverable { } // TODO: Write out change to state store (YARN-1558) - + // Also take care of RM failover moveEvent.getResult().set(null); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java index 852a0e11867..13ab17cf083 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java @@ -589,10 +589,8 @@ public class TestClientRMService { // duplicate appId try { rmService.submitApplication(submitRequest2); - Assert.fail("Exception is expected."); } catch (YarnException e) { - Assert.assertTrue("The thrown exception is not expected.", - e.getMessage().contains("Cannot add a duplicate!")); + Assert.fail("Exception is not expected."); } GetApplicationsRequest getAllAppsRequest = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestSubmitApplicationWithRMHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestSubmitApplicationWithRMHA.java index 484ba8951fb..4cceddd2940 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestSubmitApplicationWithRMHA.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestSubmitApplicationWithRMHA.java @@ -219,4 +219,89 @@ public class TestSubmitApplicationWithRMHA extends RMHATestBase{ Assert.assertEquals(appReport3.getYarnApplicationState(), appReport4.getYarnApplicationState()); } + + // There are two scenarios when RM failover happens + // during SubmitApplication Call: + // 1) RMStateStore already saved the ApplicationState when failover happens + // 2) RMStateStore did not save the ApplicationState when failover happens + @Test (timeout = 5000) + public void + testHandleRMHADuringSubmitApplicationCallWithSavedApplicationState() + throws Exception { + // Test scenario 1 when RM failover happens + // druing SubmitApplication Call: + // RMStateStore already saved the ApplicationState when failover happens + startRMs(); + + // Submit Application + // After submission, the applicationState will be saved in RMStateStore. + RMApp app0 = rm1.submitApp(200); + + // Do the failover + explicitFailover(); + + // Since the applicationState has already been saved in RMStateStore + // before failover happens, the current active rm can load the previous + // applicationState. + // This RMApp should exist in the RMContext of current active RM + Assert.assertTrue(rm2.getRMContext().getRMApps() + .containsKey(app0.getApplicationId())); + + // When we re-submit the application with same applicationId, it will + // check whether this application has been exist. If yes, just simply + // return submitApplicationResponse. + RMApp app1 = + rm2.submitApp(200, "", UserGroupInformation + .getCurrentUser().getShortUserName(), null, false, null, + configuration.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, + false, false, true, app0.getApplicationId()); + + Assert.assertEquals(app1.getApplicationId(), app0.getApplicationId()); + } + + @Test (timeout = 5000) + public void + testHandleRMHADuringSubmitApplicationCallWithoutSavedApplicationState() + throws Exception { + // Test scenario 2 when RM failover happens + // during SubmitApplication Call: + // RMStateStore did not save the ApplicationState when failover happens. + // Using customized RMAppManager. + startRMsWithCustomizedRMAppManager(); + + // Submit Application + // After submission, the applicationState will + // not be saved in RMStateStore + RMApp app0 = + rm1.submitApp(200, "", UserGroupInformation + .getCurrentUser().getShortUserName(), null, false, null, + configuration.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, + false, false); + + // Do the failover + explicitFailover(); + + // When failover happens, the RMStateStore has not saved applicationState. + // The applicationState of this RMApp is lost. + // We should not find the RMApp in the RMContext of current active rm. + Assert.assertFalse(rm2.getRMContext().getRMApps() + .containsKey(app0.getApplicationId())); + + // Submit the application with previous ApplicationId to current active RM + // This will mimic the similar behavior of ApplicationClientProtocol# + // submitApplication() when failover happens during the submission process + // because the submitApplication api is marked as idempotent + RMApp app1 = + rm2.submitApp(200, "", UserGroupInformation + .getCurrentUser().getShortUserName(), null, false, null, + configuration.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, + false, false, true, app0.getApplicationId()); + + verifySubmitApp(rm2, app1, app0.getApplicationId()); + Assert.assertTrue(rm2.getRMContext().getRMApps() + .containsKey(app0.getApplicationId())); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java index 0adbf65d603..8632815f33f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java @@ -25,7 +25,6 @@ import java.net.UnknownHostException; import java.util.Collection; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -243,12 +242,7 @@ public class MiniYARNCluster extends CompositeService { } for (int i = 0; i < resourceManagers.length; i++) { - resourceManagers[i] = new ResourceManager() { - @Override - protected void doSecureLogin() throws IOException { - // Don't try to login using keytab in the testcases. - } - }; + resourceManagers[i] = createResourceManager(); if (!useFixedPorts) { if (HAUtil.isHAEnabled(conf)) { setHARMConfiguration(i, conf); @@ -676,7 +670,7 @@ public class MiniYARNCluster extends CompositeService { } return false; } - + private class ApplicationHistoryServerWrapper extends AbstractService { public ApplicationHistoryServerWrapper() { super(ApplicationHistoryServerWrapper.class.getName()); @@ -736,4 +730,13 @@ public class MiniYARNCluster extends CompositeService { public ApplicationHistoryServer getApplicationHistoryServer() { return this.appHistoryServer; } + + protected ResourceManager createResourceManager() { + return new ResourceManager(){ + @Override + protected void doSecureLogin() throws IOException { + // Don't try to login using keytab in the testcases. + } + }; + } }