Merge r1581678 from trunk. YARN-1521. Mark Idempotent/AtMostOnce annotations to the APIs in ApplicationClientProtcol, ResourceManagerAdministrationProtocol and ResourceTrackerProtocol so that they work in HA scenario. Contributed by Xuan Gong

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1581679 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jian He 2014-03-26 03:48:46 +00:00
parent 7bd77e23d7
commit 859f27040d
12 changed files with 1162 additions and 21 deletions

View File

@ -565,6 +565,11 @@ Release 2.4.0 - UNRELEASED
YARN-1867. Fixed a bug in ResourceManager that was causing invalid ACL checks
in the web-services after fail-over. (Vinod Kumar Vavilapalli)
YARN-1521. Mark Idempotent/AtMostOnce annotations to the APIs in
ApplicationClientProtcol, ResourceManagerAdministrationProtocol and
ResourceTrackerProtocol so that they work in HA scenario. (Xuan Gong
via jianhe)
Release 2.3.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -104,6 +104,7 @@ public interface ApplicationClientProtocol {
*/
@Public
@Stable
@Idempotent
public GetNewApplicationResponse getNewApplication(
GetNewApplicationRequest request)
throws YarnException, IOException;
@ -133,6 +134,10 @@ public GetNewApplicationResponse getNewApplication(
* it encounters the {@link ApplicationNotFoundException} on the
* {@link #getApplicationReport(GetApplicationReportRequest)} call.</p>
*
* <p>During the submission process, it checks whether the application
* already exists. If the application exists, it will simply return
* SubmitApplicationResponse</p>
*
* <p> In secure mode,the <code>ResourceManager</code> verifies access to
* queues etc. before accepting the application submission.</p>
*
@ -147,6 +152,7 @@ public GetNewApplicationResponse getNewApplication(
*/
@Public
@Stable
@Idempotent
public SubmitApplicationResponse submitApplication(
SubmitApplicationRequest request)
throws YarnException, IOException;
@ -173,6 +179,7 @@ public SubmitApplicationResponse submitApplication(
*/
@Public
@Stable
@Idempotent
public KillApplicationResponse forceKillApplication(
KillApplicationRequest request)
throws YarnException, IOException;
@ -231,6 +238,7 @@ public GetApplicationReportResponse getApplicationReport(
*/
@Public
@Stable
@Idempotent
public GetClusterMetricsResponse getClusterMetrics(
GetClusterMetricsRequest request)
throws YarnException, IOException;
@ -258,6 +266,7 @@ public GetClusterMetricsResponse getClusterMetrics(
*/
@Public
@Stable
@Idempotent
public GetApplicationsResponse getApplications(
GetApplicationsRequest request)
throws YarnException, IOException;
@ -277,6 +286,7 @@ public GetApplicationsResponse getApplications(
*/
@Public
@Stable
@Idempotent
public GetClusterNodesResponse getClusterNodes(
GetClusterNodesRequest request)
throws YarnException, IOException;
@ -298,6 +308,7 @@ public GetClusterNodesResponse getClusterNodes(
*/
@Public
@Stable
@Idempotent
public GetQueueInfoResponse getQueueInfo(
GetQueueInfoRequest request)
throws YarnException, IOException;
@ -317,6 +328,7 @@ public GetQueueInfoResponse getQueueInfo(
*/
@Public
@Stable
@Idempotent
public GetQueueUserAclsInfoResponse getQueueUserAcls(
GetQueueUserAclsInfoRequest request)
throws YarnException, IOException;
@ -335,6 +347,7 @@ public GetQueueUserAclsInfoResponse getQueueUserAcls(
*/
@Public
@Stable
@Idempotent
public GetDelegationTokenResponse getDelegationToken(
GetDelegationTokenRequest request)
throws YarnException, IOException;
@ -349,6 +362,7 @@ public GetDelegationTokenResponse getDelegationToken(
*/
@Private
@Unstable
@Idempotent
public RenewDelegationTokenResponse renewDelegationToken(
RenewDelegationTokenRequest request) throws YarnException,
IOException;
@ -363,6 +377,7 @@ public RenewDelegationTokenResponse renewDelegationToken(
*/
@Private
@Unstable
@Idempotent
public CancelDelegationTokenResponse cancelDelegationToken(
CancelDelegationTokenRequest request) throws YarnException,
IOException;
@ -377,6 +392,7 @@ public CancelDelegationTokenResponse cancelDelegationToken(
*/
@Public
@Unstable
@Idempotent
public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
MoveApplicationAcrossQueuesRequest request) throws YarnException, IOException;
@ -422,6 +438,7 @@ public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
*/
@Public
@Unstable
@Idempotent
public GetApplicationAttemptReportResponse getApplicationAttemptReport(
GetApplicationAttemptReportRequest request) throws YarnException,
IOException;
@ -453,6 +470,7 @@ public GetApplicationAttemptReportResponse getApplicationAttemptReport(
*/
@Public
@Unstable
@Idempotent
public GetApplicationAttemptsResponse getApplicationAttempts(
GetApplicationAttemptsRequest request) throws YarnException, IOException;
@ -486,6 +504,7 @@ public GetApplicationAttemptsResponse getApplicationAttempts(
*/
@Public
@Unstable
@Idempotent
public GetContainerReportResponse getContainerReport(
GetContainerReportRequest request) throws YarnException, IOException;
@ -520,6 +539,7 @@ public GetContainerReportResponse getContainerReport(
*/
@Public
@Unstable
@Idempotent
public GetContainersResponse getContainers(GetContainersRequest request)
throws YarnException, IOException;

View File

@ -24,6 +24,7 @@
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.io.retry.Idempotent;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.tools.GetUserMappingsProtocol;
import org.apache.hadoop.yarn.api.records.NodeId;
@ -50,16 +51,19 @@ public interface ResourceManagerAdministrationProtocol extends GetUserMappingsPr
@Public
@Stable
@Idempotent
public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
throws StandbyException, YarnException, IOException;
@Public
@Stable
@Idempotent
public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
throws StandbyException, YarnException, IOException;
@Public
@Stable
@Idempotent
public RefreshSuperUserGroupsConfigurationResponse
refreshSuperUserGroupsConfiguration(
RefreshSuperUserGroupsConfigurationRequest request)
@ -67,18 +71,21 @@ public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
@Public
@Stable
@Idempotent
public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings(
RefreshUserToGroupsMappingsRequest request)
throws StandbyException, YarnException, IOException;
@Public
@Stable
@Idempotent
public RefreshAdminAclsResponse refreshAdminAcls(
RefreshAdminAclsRequest request)
throws YarnException, IOException;
@Public
@Stable
@Idempotent
public RefreshServiceAclsResponse refreshServiceAcls(
RefreshServiceAclsRequest request)
throws YarnException, IOException;
@ -99,6 +106,7 @@ public RefreshServiceAclsResponse refreshServiceAcls(
*/
@Public
@Evolving
@Idempotent
public UpdateNodeResourceResponse updateNodeResource(
UpdateNodeResourceRequest request)
throws YarnException, IOException;

View File

@ -0,0 +1,721 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.ClientBaseWithFixes;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.util.Records;
import org.junit.After;
import org.junit.Before;
public abstract class ProtocolHATestBase extends ClientBaseWithFixes{
protected static final HAServiceProtocol.StateChangeRequestInfo req =
new HAServiceProtocol.StateChangeRequestInfo(
HAServiceProtocol.RequestSource.REQUEST_BY_USER);
protected static final String RM1_NODE_ID = "rm1";
protected static final int RM1_PORT_BASE = 10000;
protected static final String RM2_NODE_ID = "rm2";
protected static final int RM2_PORT_BASE = 20000;
protected Configuration conf;
protected MiniYARNClusterForHATesting cluster;
protected Thread failoverThread = null;
private volatile boolean keepRunning;
private void setConfForRM(String rmId, String prefix, String value) {
conf.set(HAUtil.addSuffix(prefix, rmId), value);
}
private void setRpcAddressForRM(String rmId, int base) {
setConfForRM(rmId, YarnConfiguration.RM_ADDRESS, "0.0.0.0:" +
(base + YarnConfiguration.DEFAULT_RM_PORT));
setConfForRM(rmId, YarnConfiguration.RM_SCHEDULER_ADDRESS, "0.0.0.0:" +
(base + YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT));
setConfForRM(rmId, YarnConfiguration.RM_ADMIN_ADDRESS, "0.0.0.0:" +
(base + YarnConfiguration.DEFAULT_RM_ADMIN_PORT));
setConfForRM(rmId, YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
"0.0.0.0:" + (base + YarnConfiguration
.DEFAULT_RM_RESOURCE_TRACKER_PORT));
setConfForRM(rmId, YarnConfiguration.RM_WEBAPP_ADDRESS, "0.0.0.0:" +
(base + YarnConfiguration.DEFAULT_RM_WEBAPP_PORT));
setConfForRM(rmId, YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS, "0.0.0.0:" +
(base + YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_PORT));
}
@Before
public void setup() throws IOException {
failoverThread = null;
keepRunning = true;
conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
conf.setInt(YarnConfiguration.CLIENT_FAILOVER_MAX_ATTEMPTS, 5);
conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID);
setRpcAddressForRM(RM1_NODE_ID, RM1_PORT_BASE);
setRpcAddressForRM(RM2_NODE_ID, RM2_PORT_BASE);
conf.setLong(YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS, 100L);
conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_USE_RPC, true);
}
@After
public void teardown() throws Exception {
keepRunning = false;
if (failoverThread != null) {
failoverThread.interrupt();
try {
failoverThread.join();
} catch (InterruptedException ex) {
LOG.error("Error joining with failover thread", ex);
}
}
cluster.stop();
}
protected AdminService getAdminService(int index) {
return cluster.getResourceManager(index).getRMContext()
.getRMAdminService();
}
protected void explicitFailover() throws IOException {
int activeRMIndex = cluster.getActiveRMIndex();
int newActiveRMIndex = (activeRMIndex + 1) % 2;
getAdminService(activeRMIndex).transitionToStandby(req);
getAdminService(newActiveRMIndex).transitionToActive(req);
assertEquals("Failover failed", newActiveRMIndex,
cluster.getActiveRMIndex());
}
protected YarnClient createAndStartYarnClient(Configuration conf) {
Configuration configuration = new YarnConfiguration(conf);
YarnClient client = YarnClient.createYarnClient();
client.init(configuration);
client.start();
return client;
}
protected void verifyConnections() throws InterruptedException,
YarnException {
assertTrue("NMs failed to connect to the RM",
cluster.waitForNodeManagersToConnect(20000));
verifyClientConnection();
}
protected void verifyClientConnection() {
int numRetries = 3;
while(numRetries-- > 0) {
Configuration conf = new YarnConfiguration(this.conf);
YarnClient client = createAndStartYarnClient(conf);
try {
Thread.sleep(100);
client.getApplications();
return;
} catch (Exception e) {
LOG.error(e.getMessage());
} finally {
client.stop();
}
}
fail("Client couldn't connect to the Active RM");
}
protected Thread createAndStartFailoverThread() {
Thread failoverThread = new Thread() {
public void run() {
keepRunning = true;
while (keepRunning) {
if (cluster.getStartFailoverFlag()) {
try {
explicitFailover();
keepRunning = false;
cluster.resetFailoverTriggeredFlag(true);
} catch (Exception e) {
// Do Nothing
} finally {
keepRunning = false;
}
}
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// DO NOTHING
}
}
}
};
failoverThread.start();
return failoverThread;
}
protected void startHACluster(int numOfNMs, boolean overrideClientRMService,
boolean overrideRTS) throws Exception {
conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
cluster =
new MiniYARNClusterForHATesting(TestRMFailover.class.getName(), 2,
numOfNMs, 1, 1, false, overrideClientRMService, overrideRTS);
cluster.resetStartFailoverFlag(false);
cluster.init(conf);
cluster.start();
getAdminService(0).transitionToActive(req);
assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
verifyConnections();
// Do the failover
explicitFailover();
verifyConnections();
failoverThread = createAndStartFailoverThread();
}
protected ResourceManager getActiveRM() {
return cluster.getResourceManager(cluster.getActiveRMIndex());
}
public class MiniYARNClusterForHATesting extends MiniYARNCluster {
private boolean overrideClientRMService;
private boolean overrideRTS;
private final AtomicBoolean startFailover = new AtomicBoolean(false);
private final AtomicBoolean failoverTriggered = new AtomicBoolean(false);
public MiniYARNClusterForHATesting(String testName,
int numResourceManagers, int numNodeManagers, int numLocalDirs,
int numLogDirs, boolean enableAHS, boolean overrideClientRMService,
boolean overrideRTS) {
super(testName, numResourceManagers, numNodeManagers, numLocalDirs,
numLogDirs, enableAHS);
this.overrideClientRMService = overrideClientRMService;
this.overrideRTS = overrideRTS;
}
public boolean getStartFailoverFlag() {
return startFailover.get();
}
public void resetStartFailoverFlag(boolean flag) {
startFailover.set(flag);
}
public void resetFailoverTriggeredFlag(boolean flag) {
failoverTriggered.set(flag);
}
private boolean waittingForFailOver() {
int maximumWaittingTime = 50;
int count = 0;
while (!failoverTriggered.get() && count >= maximumWaittingTime) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// DO NOTHING
}
count++;
}
if (count >= maximumWaittingTime) {
return false;
}
return true;
}
@Override
protected ResourceManager createResourceManager() {
return new ResourceManager() {
@Override
protected void doSecureLogin() throws IOException {
// Don't try to login using keytab in the testcases.
}
@Override
protected ClientRMService createClientRMService() {
if (overrideClientRMService) {
return new CustomedClientRMService(this.rmContext, this.scheduler,
this.rmAppManager, this.applicationACLsManager,
this.queueACLsManager,
this.rmContext.getRMDelegationTokenSecretManager());
}
return super.createClientRMService();
}
@Override
protected ResourceTrackerService createResourceTrackerService() {
if (overrideRTS) {
return new CustomedResourceTrackerService(this.rmContext,
this.nodesListManager, this.nmLivelinessMonitor,
this.rmContext.getContainerTokenSecretManager(),
this.rmContext.getNMTokenSecretManager());
}
return super.createResourceTrackerService();
}
};
}
private class CustomedClientRMService extends ClientRMService {
public CustomedClientRMService(RMContext rmContext,
YarnScheduler scheduler, RMAppManager rmAppManager,
ApplicationACLsManager applicationACLsManager,
QueueACLsManager queueACLsManager,
RMDelegationTokenSecretManager rmDTSecretManager) {
super(rmContext, scheduler, rmAppManager, applicationACLsManager,
queueACLsManager, rmDTSecretManager);
}
@Override
public GetNewApplicationResponse getNewApplication(
GetNewApplicationRequest request) throws YarnException {
resetStartFailoverFlag(true);
// make sure failover has been triggered
Assert.assertTrue(waittingForFailOver());
// create the GetNewApplicationResponse with fake applicationId
GetNewApplicationResponse response =
GetNewApplicationResponse.newInstance(
createFakeAppId(), null, null);
return response;
}
@Override
public GetApplicationReportResponse getApplicationReport(
GetApplicationReportRequest request) throws YarnException {
resetStartFailoverFlag(true);
// make sure failover has been triggered
Assert.assertTrue(waittingForFailOver());
// create a fake application report
ApplicationReport report = createFakeAppReport();
GetApplicationReportResponse response =
GetApplicationReportResponse.newInstance(report);
return response;
}
@Override
public GetClusterMetricsResponse getClusterMetrics(
GetClusterMetricsRequest request) throws YarnException {
resetStartFailoverFlag(true);
// make sure failover has been triggered
Assert.assertTrue(waittingForFailOver());
// create GetClusterMetricsResponse with fake YarnClusterMetrics
GetClusterMetricsResponse response =
GetClusterMetricsResponse.newInstance(
createFakeYarnClusterMetrics());
return response;
}
@Override
public GetApplicationsResponse getApplications(
GetApplicationsRequest request) throws YarnException {
resetStartFailoverFlag(true);
// make sure failover has been triggered
Assert.assertTrue(waittingForFailOver());
// create GetApplicationsResponse with fake applicationList
GetApplicationsResponse response =
GetApplicationsResponse.newInstance(createFakeAppReports());
return response;
}
@Override
public GetClusterNodesResponse getClusterNodes(
GetClusterNodesRequest request)
throws YarnException {
resetStartFailoverFlag(true);
// make sure failover has been triggered
Assert.assertTrue(waittingForFailOver());
// create GetClusterNodesResponse with fake ClusterNodeLists
GetClusterNodesResponse response =
GetClusterNodesResponse.newInstance(createFakeNodeReports());
return response;
}
@Override
public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request)
throws YarnException {
resetStartFailoverFlag(true);
// make sure failover has been triggered
Assert.assertTrue(waittingForFailOver());
// return fake QueueInfo
return GetQueueInfoResponse.newInstance(createFakeQueueInfo());
}
@Override
public GetQueueUserAclsInfoResponse getQueueUserAcls(
GetQueueUserAclsInfoRequest request) throws YarnException {
resetStartFailoverFlag(true);
// make sure failover has been triggered
Assert.assertTrue(waittingForFailOver());
// return fake queueUserAcls
return GetQueueUserAclsInfoResponse
.newInstance(createFakeQueueUserACLInfoList());
}
@Override
public GetApplicationAttemptReportResponse getApplicationAttemptReport(
GetApplicationAttemptReportRequest request) throws YarnException,
IOException {
resetStartFailoverFlag(true);
// make sure failover has been triggered
Assert.assertTrue(waittingForFailOver());
// return fake ApplicationAttemptReport
return GetApplicationAttemptReportResponse
.newInstance(createFakeApplicationAttemptReport());
}
@Override
public GetApplicationAttemptsResponse getApplicationAttempts(
GetApplicationAttemptsRequest request) throws YarnException,
IOException {
resetStartFailoverFlag(true);
// make sure failover has been triggered
Assert.assertTrue(waittingForFailOver());
// return fake ApplicationAttemptReports
return GetApplicationAttemptsResponse
.newInstance(createFakeApplicationAttemptReports());
}
@Override
public GetContainerReportResponse getContainerReport(
GetContainerReportRequest request) throws YarnException,
IOException {
resetStartFailoverFlag(true);
// make sure failover has been triggered
Assert.assertTrue(waittingForFailOver());
// return fake containerReport
return GetContainerReportResponse
.newInstance(createFakeContainerReport());
}
@Override
public GetContainersResponse getContainers(GetContainersRequest request)
throws YarnException, IOException {
resetStartFailoverFlag(true);
// make sure failover has been triggered
Assert.assertTrue(waittingForFailOver());
// return fake ContainerReports
return GetContainersResponse.newInstance(createFakeContainerReports());
}
@Override
public SubmitApplicationResponse submitApplication(
SubmitApplicationRequest request) throws YarnException {
resetStartFailoverFlag(true);
// make sure failover has been triggered
Assert.assertTrue(waittingForFailOver());
return super.submitApplication(request);
}
@Override
public KillApplicationResponse forceKillApplication(
KillApplicationRequest request) throws YarnException {
resetStartFailoverFlag(true);
// make sure failover has been triggered
Assert.assertTrue(waittingForFailOver());
return KillApplicationResponse.newInstance(true);
}
@Override
public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
MoveApplicationAcrossQueuesRequest request) throws YarnException {
resetStartFailoverFlag(true);
// make sure failover has been triggered
Assert.assertTrue(waittingForFailOver());
return Records.newRecord(MoveApplicationAcrossQueuesResponse.class);
}
@Override
public GetDelegationTokenResponse getDelegationToken(
GetDelegationTokenRequest request) throws YarnException {
resetStartFailoverFlag(true);
// make sure failover has been triggered
Assert.assertTrue(waittingForFailOver());
return GetDelegationTokenResponse.newInstance(createFakeToken());
}
@Override
public RenewDelegationTokenResponse renewDelegationToken(
RenewDelegationTokenRequest request) throws YarnException {
resetStartFailoverFlag(true);
// make sure failover has been triggered
Assert.assertTrue(waittingForFailOver());
return RenewDelegationTokenResponse
.newInstance(createNextExpirationTime());
}
@Override
public CancelDelegationTokenResponse cancelDelegationToken(
CancelDelegationTokenRequest request) throws YarnException {
resetStartFailoverFlag(true);
// make sure failover has been triggered
Assert.assertTrue(waittingForFailOver());
return CancelDelegationTokenResponse.newInstance();
}
}
public ApplicationReport createFakeAppReport() {
ApplicationId appId = ApplicationId.newInstance(1000l, 1);
ApplicationAttemptId attemptId =
ApplicationAttemptId.newInstance(appId, 1);
// create a fake application report
ApplicationReport report =
ApplicationReport.newInstance(appId, attemptId, "fakeUser",
"fakeQueue", "fakeApplicationName", "localhost", 0, null,
YarnApplicationState.FAILED, "fake an application report", "",
1000l, 1200l, FinalApplicationStatus.FAILED, null, "", 50f,
"fakeApplicationType", null);
return report;
}
public List<ApplicationReport> createFakeAppReports() {
List<ApplicationReport> reports = new ArrayList<ApplicationReport>();
reports.add(createFakeAppReport());
return reports;
}
public ApplicationId createFakeAppId() {
return ApplicationId.newInstance(1000l, 1);
}
public ApplicationAttemptId createFakeApplicationAttemptId() {
return ApplicationAttemptId.newInstance(createFakeAppId(), 0);
}
public ContainerId createFakeContainerId() {
return ContainerId.newInstance(createFakeApplicationAttemptId(), 0);
}
public YarnClusterMetrics createFakeYarnClusterMetrics() {
return YarnClusterMetrics.newInstance(1);
}
public List<NodeReport> createFakeNodeReports() {
NodeId nodeId = NodeId.newInstance("localhost", 0);
NodeReport report =
NodeReport.newInstance(nodeId, NodeState.RUNNING, "localhost",
"rack1", null, null, 4, null, 1000l);
List<NodeReport> reports = new ArrayList<NodeReport>();
reports.add(report);
return reports;
}
public QueueInfo createFakeQueueInfo() {
return QueueInfo.newInstance("root", 100f, 100f, 50f, null,
createFakeAppReports(), QueueState.RUNNING);
}
public List<QueueUserACLInfo> createFakeQueueUserACLInfoList() {
List<QueueACL> queueACL = new ArrayList<QueueACL>();
queueACL.add(QueueACL.SUBMIT_APPLICATIONS);
QueueUserACLInfo info = QueueUserACLInfo.newInstance("root", queueACL);
List<QueueUserACLInfo> infos = new ArrayList<QueueUserACLInfo>();
infos.add(info);
return infos;
}
public ApplicationAttemptReport createFakeApplicationAttemptReport() {
return ApplicationAttemptReport.newInstance(
createFakeApplicationAttemptId(), "localhost", 0, "", "",
YarnApplicationAttemptState.RUNNING, createFakeContainerId());
}
public List<ApplicationAttemptReport>
createFakeApplicationAttemptReports() {
List<ApplicationAttemptReport> reports =
new ArrayList<ApplicationAttemptReport>();
reports.add(createFakeApplicationAttemptReport());
return reports;
}
public ContainerReport createFakeContainerReport() {
return ContainerReport.newInstance(createFakeContainerId(), null,
NodeId.newInstance("localhost", 0), null, 1000l, 1200l, "", "", 0,
ContainerState.COMPLETE);
}
public List<ContainerReport> createFakeContainerReports() {
List<ContainerReport> reports =
new ArrayList<ContainerReport>();
reports.add(createFakeContainerReport());
return reports;
}
public Token createFakeToken() {
String identifier = "fake Token";
String password = "fake token passwd";
Token token = Token.newInstance(
identifier.getBytes(), " ", password.getBytes(), " ");
return token;
}
public long createNextExpirationTime() {
return "fake Token".getBytes().length;
}
private class CustomedResourceTrackerService extends
ResourceTrackerService {
public CustomedResourceTrackerService(RMContext rmContext,
NodesListManager nodesListManager,
NMLivelinessMonitor nmLivelinessMonitor,
RMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInRM nmTokenSecretManager) {
super(rmContext, nodesListManager, nmLivelinessMonitor,
containerTokenSecretManager, nmTokenSecretManager);
}
@Override
public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request) throws YarnException,
IOException {
resetStartFailoverFlag(true);
// make sure failover has been triggered
Assert.assertTrue(waittingForFailOver());
return super.registerNodeManager(request);
}
@Override
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
throws YarnException, IOException {
resetStartFailoverFlag(true);
// make sure failover has been triggered
Assert.assertTrue(waittingForFailOver());
return super.nodeHeartbeat(request);
}
}
}
}

View File

@ -0,0 +1,211 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.Records;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class TestApplicationClientProtocolOnHA extends ProtocolHATestBase {
private YarnClient client = null;
@Before
public void initiate() throws Exception {
startHACluster(1, true, false);
Configuration conf = new YarnConfiguration(this.conf);
client = createAndStartYarnClient(conf);
}
@After
public void shutDown() {
if (client != null) {
client.stop();
}
}
@Test(timeout = 15000)
public void testGetApplicationReportOnHA() throws Exception {
ApplicationReport report =
client.getApplicationReport(cluster.createFakeAppId());
Assert.assertTrue(report != null);
Assert.assertEquals(cluster.createFakeAppReport(), report);
}
@Test(timeout = 15000)
public void testGetNewApplicationOnHA() throws Exception {
ApplicationId appId =
client.createApplication().getApplicationSubmissionContext()
.getApplicationId();
Assert.assertTrue(appId != null);
Assert.assertEquals(cluster.createFakeAppId(), appId);
}
@Test(timeout = 15000)
public void testGetClusterMetricsOnHA() throws Exception {
YarnClusterMetrics clusterMetrics =
client.getYarnClusterMetrics();
Assert.assertTrue(clusterMetrics != null);
Assert.assertEquals(cluster.createFakeYarnClusterMetrics(),
clusterMetrics);
}
@Test(timeout = 15000)
public void testGetApplicationsOnHA() throws Exception {
List<ApplicationReport> reports =
client.getApplications();
Assert.assertTrue(reports != null && !reports.isEmpty());
Assert.assertEquals(cluster.createFakeAppReports(),
reports);
}
@Test(timeout = 15000)
public void testGetClusterNodesOnHA() throws Exception {
List<NodeReport> reports = client.getNodeReports(NodeState.RUNNING);
Assert.assertTrue(reports != null && !reports.isEmpty());
Assert.assertEquals(cluster.createFakeNodeReports(),
reports);
}
@Test(timeout = 15000)
public void testGetQueueInfoOnHA() throws Exception {
QueueInfo queueInfo = client.getQueueInfo("root");
Assert.assertTrue(queueInfo != null);
Assert.assertEquals(cluster.createFakeQueueInfo(),
queueInfo);
}
@Test(timeout = 15000)
public void testGetQueueUserAclsOnHA() throws Exception {
List<QueueUserACLInfo> queueUserAclsList = client.getQueueAclsInfo();
Assert.assertTrue(queueUserAclsList != null
&& !queueUserAclsList.isEmpty());
Assert.assertEquals(cluster.createFakeQueueUserACLInfoList(),
queueUserAclsList);
}
@Test(timeout = 15000)
public void testGetApplicationAttemptReportOnHA() throws Exception {
ApplicationAttemptReport report =
client.getApplicationAttemptReport(cluster
.createFakeApplicationAttemptId());
Assert.assertTrue(report != null);
Assert.assertEquals(cluster.createFakeApplicationAttemptReport(), report);
}
@Test(timeout = 15000)
public void testGetApplicationAttemptsOnHA() throws Exception {
List<ApplicationAttemptReport> reports =
client.getApplicationAttempts(cluster.createFakeAppId());
Assert.assertTrue(reports != null && !reports.isEmpty());
Assert.assertEquals(cluster.createFakeApplicationAttemptReports(),
reports);
}
@Test(timeout = 15000)
public void testGetContainerReportOnHA() throws Exception {
ContainerReport report =
client.getContainerReport(cluster.createFakeContainerId());
Assert.assertTrue(report != null);
Assert.assertEquals(cluster.createFakeContainerReport(), report);
}
@Test(timeout = 15000)
public void testGetContainersOnHA() throws Exception {
List<ContainerReport> reports =
client.getContainers(cluster.createFakeApplicationAttemptId());
Assert.assertTrue(reports != null && !reports.isEmpty());
Assert.assertEquals(cluster.createFakeContainerReports(),
reports);
}
@Test(timeout = 15000)
public void testSubmitApplicationOnHA() throws Exception {
ApplicationSubmissionContext appContext =
Records.newRecord(ApplicationSubmissionContext.class);
appContext.setApplicationId(cluster.createFakeAppId());
ContainerLaunchContext amContainer =
Records.newRecord(ContainerLaunchContext.class);
appContext.setAMContainerSpec(amContainer);
Resource capability = Records.newRecord(Resource.class);
capability.setMemory(10);
capability.setVirtualCores(1);
appContext.setResource(capability);
ApplicationId appId = client.submitApplication(appContext);
Assert.assertTrue(getActiveRM().getRMContext().getRMApps()
.containsKey(appId));
}
@Test(timeout = 15000)
public void testMoveApplicationAcrossQueuesOnHA() throws Exception{
client.moveApplicationAcrossQueues(cluster.createFakeAppId(), "root");
}
@Test(timeout = 15000)
public void testForceKillApplicationOnHA() throws Exception {
client.killApplication(cluster.createFakeAppId());
}
@Test(timeout = 15000)
public void testGetDelegationTokenOnHA() throws Exception {
Token token = client.getRMDelegationToken(new Text(" "));
Assert.assertEquals(token, cluster.createFakeToken());
}
@Test(timeout = 15000)
public void testRenewDelegationTokenOnHA() throws Exception {
RenewDelegationTokenRequest request =
RenewDelegationTokenRequest.newInstance(cluster.createFakeToken());
long newExpirationTime =
ClientRMProxy.createRMProxy(this.conf, ApplicationClientProtocol.class)
.renewDelegationToken(request).getNextExpirationTime();
Assert.assertEquals(newExpirationTime, cluster.createNextExpirationTime());
}
@Test(timeout = 15000)
public void testCancelDelegationTokenOnHA() throws Exception {
CancelDelegationTokenRequest request =
CancelDelegationTokenRequest.newInstance(cluster.createFakeToken());
ClientRMProxy.createRMProxy(this.conf, ApplicationClientProtocol.class)
.cancelDelegationToken(request);
}
}

View File

@ -0,0 +1,91 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client;
import java.io.IOException;
import junit.framework.Assert;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.api.ServerRMProxy;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.util.YarnVersionInfo;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestResourceTrackerOnHA extends ProtocolHATestBase{
private ResourceTracker resourceTracker = null;
@Before
public void initiate() throws Exception {
startHACluster(0, false, true);
this.resourceTracker = getRMClient();
}
@After
public void shutDown() {
if(this.resourceTracker != null) {
RPC.stopProxy(this.resourceTracker);
}
}
@Test(timeout = 15000)
public void testResourceTrackerOnHA() throws Exception {
NodeId nodeId = NodeId.newInstance("localhost", 0);
Resource resource = Resource.newInstance(2048, 4);
// make sure registerNodeManager works when failover happens
RegisterNodeManagerRequest request =
RegisterNodeManagerRequest.newInstance(nodeId, 0, resource,
YarnVersionInfo.getVersion(), null);
resourceTracker.registerNodeManager(request);
Assert.assertTrue(waitForNodeManagerToConnect(10000, nodeId));
// restart the failover thread, and make sure nodeHeartbeat works
failoverThread = createAndStartFailoverThread();
NodeStatus status =
NodeStatus.newInstance(NodeId.newInstance("localhost", 0), 0, null,
null, null);
NodeHeartbeatRequest request2 =
NodeHeartbeatRequest.newInstance(status, null, null);
resourceTracker.nodeHeartbeat(request2);
}
private ResourceTracker getRMClient() throws IOException {
return ServerRMProxy.createRMProxy(this.conf, ResourceTracker.class);
}
private boolean waitForNodeManagerToConnect(int timeout, NodeId nodeId)
throws Exception {
for (int i = 0; i < timeout / 100; i++) {
if (getActiveRM().getRMContext().getRMNodes().containsKey(nodeId)) {
return true;
}
Thread.sleep(100);
}
return false;
}
}

View File

@ -19,6 +19,8 @@
import java.io.IOException;
import org.apache.hadoop.io.retry.AtMostOnce;
import org.apache.hadoop.io.retry.Idempotent;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
@ -27,10 +29,12 @@
public interface ResourceTracker {
@Idempotent
public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request) throws YarnException,
IOException;
@AtMostOnce
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
throws YarnException, IOException;

View File

@ -504,16 +504,11 @@ public SubmitApplicationResponse submitApplication(
throw RPCUtil.getRemoteException(ie);
}
// Though duplication will checked again when app is put into rmContext,
// but it is good to fail the invalid submission as early as possible.
// Check whether app has already been put into rmContext,
// If it is, simply return the response
if (rmContext.getRMApps().get(applicationId) != null) {
String message = "Application with id " + applicationId +
" is already present! Cannot add a duplicate!";
LOG.warn(message);
RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
message, "ClientRMService", "Exception in submitting application",
applicationId);
throw RPCUtil.getRemoteException(message);
LOG.info("This is an earlier submitted application: " + applicationId);
return SubmitApplicationResponse.newInstance();
}
if (submissionContext.getQueue() == null) {

View File

@ -718,7 +718,7 @@ public void transition(RMAppImpl app, RMAppEvent event) {
}
// TODO: Write out change to state store (YARN-1558)
// Also take care of RM failover
moveEvent.getResult().set(null);
}
}

View File

@ -589,10 +589,8 @@ public void handle(Event event) {}
// duplicate appId
try {
rmService.submitApplication(submitRequest2);
Assert.fail("Exception is expected.");
} catch (YarnException e) {
Assert.assertTrue("The thrown exception is not expected.",
e.getMessage().contains("Cannot add a duplicate!"));
Assert.fail("Exception is not expected.");
}
GetApplicationsRequest getAllAppsRequest =

View File

@ -219,4 +219,89 @@ public void testGetApplicationReportIdempotent() throws Exception{
Assert.assertEquals(appReport3.getYarnApplicationState(),
appReport4.getYarnApplicationState());
}
// There are two scenarios when RM failover happens
// during SubmitApplication Call:
// 1) RMStateStore already saved the ApplicationState when failover happens
// 2) RMStateStore did not save the ApplicationState when failover happens
@Test (timeout = 5000)
public void
testHandleRMHADuringSubmitApplicationCallWithSavedApplicationState()
throws Exception {
// Test scenario 1 when RM failover happens
// druing SubmitApplication Call:
// RMStateStore already saved the ApplicationState when failover happens
startRMs();
// Submit Application
// After submission, the applicationState will be saved in RMStateStore.
RMApp app0 = rm1.submitApp(200);
// Do the failover
explicitFailover();
// Since the applicationState has already been saved in RMStateStore
// before failover happens, the current active rm can load the previous
// applicationState.
// This RMApp should exist in the RMContext of current active RM
Assert.assertTrue(rm2.getRMContext().getRMApps()
.containsKey(app0.getApplicationId()));
// When we re-submit the application with same applicationId, it will
// check whether this application has been exist. If yes, just simply
// return submitApplicationResponse.
RMApp app1 =
rm2.submitApp(200, "", UserGroupInformation
.getCurrentUser().getShortUserName(), null, false, null,
configuration.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null,
false, false, true, app0.getApplicationId());
Assert.assertEquals(app1.getApplicationId(), app0.getApplicationId());
}
@Test (timeout = 5000)
public void
testHandleRMHADuringSubmitApplicationCallWithoutSavedApplicationState()
throws Exception {
// Test scenario 2 when RM failover happens
// during SubmitApplication Call:
// RMStateStore did not save the ApplicationState when failover happens.
// Using customized RMAppManager.
startRMsWithCustomizedRMAppManager();
// Submit Application
// After submission, the applicationState will
// not be saved in RMStateStore
RMApp app0 =
rm1.submitApp(200, "", UserGroupInformation
.getCurrentUser().getShortUserName(), null, false, null,
configuration.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null,
false, false);
// Do the failover
explicitFailover();
// When failover happens, the RMStateStore has not saved applicationState.
// The applicationState of this RMApp is lost.
// We should not find the RMApp in the RMContext of current active rm.
Assert.assertFalse(rm2.getRMContext().getRMApps()
.containsKey(app0.getApplicationId()));
// Submit the application with previous ApplicationId to current active RM
// This will mimic the similar behavior of ApplicationClientProtocol#
// submitApplication() when failover happens during the submission process
// because the submitApplication api is marked as idempotent
RMApp app1 =
rm2.submitApp(200, "", UserGroupInformation
.getCurrentUser().getShortUserName(), null, false, null,
configuration.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null,
false, false, true, app0.getApplicationId());
verifySubmitApp(rm2, app1, app0.getApplicationId());
Assert.assertTrue(rm2.getRMContext().getRMApps()
.containsKey(app0.getApplicationId()));
}
}

View File

@ -25,7 +25,6 @@
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@ -243,12 +242,7 @@ public void serviceInit(Configuration conf) throws Exception {
}
for (int i = 0; i < resourceManagers.length; i++) {
resourceManagers[i] = new ResourceManager() {
@Override
protected void doSecureLogin() throws IOException {
// Don't try to login using keytab in the testcases.
}
};
resourceManagers[i] = createResourceManager();
if (!useFixedPorts) {
if (HAUtil.isHAEnabled(conf)) {
setHARMConfiguration(i, conf);
@ -676,7 +670,7 @@ public boolean waitForNodeManagersToConnect(long timeout)
}
return false;
}
private class ApplicationHistoryServerWrapper extends AbstractService {
public ApplicationHistoryServerWrapper() {
super(ApplicationHistoryServerWrapper.class.getName());
@ -736,4 +730,13 @@ protected synchronized void serviceStop() throws Exception {
public ApplicationHistoryServer getApplicationHistoryServer() {
return this.appHistoryServer;
}
protected ResourceManager createResourceManager() {
return new ResourceManager(){
@Override
protected void doSecureLogin() throws IOException {
// Don't try to login using keytab in the testcases.
}
};
}
}