From 3532d96ff67d9aa5e9a243bc58f39b67d1de8d18 Mon Sep 17 00:00:00 2001 From: Jian He Date: Sun, 30 Mar 2014 21:35:54 +0000 Subject: [PATCH] YARN-1893. Mark AtMostOnce annotation to ApplicationMasterProtocol#allocate. Contributed by Xuan Gong. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1583203 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../yarn/api/ApplicationMasterProtocol.java | 2 + .../yarn/client/ProtocolHATestBase.java | 57 ++++++++++- .../TestApplicationClientProtocolOnHA.java | 2 +- .../TestApplicationMasterServiceOnHA.java | 94 +++++++++++++++++++ .../yarn/client/TestResourceTrackerOnHA.java | 2 +- .../hadoop/yarn/server/MiniYARNCluster.java | 4 + 7 files changed, 159 insertions(+), 5 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceOnHA.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 176b732f2b9..f17257be756 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -596,6 +596,9 @@ Release 2.4.0 - UNRELEASED YARN-1873. Fixed TestDistributedShell failure when the test cases are out of order. (Mit Desai via zjshen) + YARN-1893. Mark AtMostOnce annotation to ApplicationMasterProtocol#allocate. + (Xuan Gong via jianhe) + Release 2.3.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationMasterProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationMasterProtocol.java index fee72e8ec37..f4ae9bef954 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationMasterProtocol.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationMasterProtocol.java @@ -22,6 +22,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.io.retry.AtMostOnce; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; @@ -162,6 +163,7 @@ public FinishApplicationMasterResponse finishApplicationMaster( */ @Public @Stable + @AtMostOnce public AllocateResponse allocate(AllocateRequest request) throws YarnException, IOException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java index 4f3cab2c327..1f5bd058a91 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java @@ -33,6 +33,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ha.ClientBaseWithFixes; import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; @@ -67,14 +69,18 @@ import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; +import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; @@ -82,6 +88,7 @@ import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.api.records.YarnApplicationState; @@ -96,6 +103,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.resourcemanager.AdminService; +import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService; import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager; @@ -257,11 +265,13 @@ public void run() { } protected void startHACluster(int numOfNMs, boolean overrideClientRMService, - boolean overrideRTS) throws Exception { + boolean overrideRTS, boolean overrideApplicationMasterService) + throws Exception { conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); cluster = new MiniYARNClusterForHATesting(TestRMFailover.class.getName(), 2, - numOfNMs, 1, 1, false, overrideClientRMService, overrideRTS); + numOfNMs, 1, 1, false, overrideClientRMService, overrideRTS, + overrideApplicationMasterService); cluster.resetStartFailoverFlag(false); cluster.init(conf); cluster.start(); @@ -285,17 +295,19 @@ public class MiniYARNClusterForHATesting extends MiniYARNCluster { private boolean overrideClientRMService; private boolean overrideRTS; + private boolean overrideApplicationMasterService; private final AtomicBoolean startFailover = new AtomicBoolean(false); private final AtomicBoolean failoverTriggered = new AtomicBoolean(false); public MiniYARNClusterForHATesting(String testName, int numResourceManagers, int numNodeManagers, int numLocalDirs, int numLogDirs, boolean enableAHS, boolean overrideClientRMService, - boolean overrideRTS) { + boolean overrideRTS, boolean overrideApplicationMasterService) { super(testName, numResourceManagers, numNodeManagers, numLocalDirs, numLogDirs, enableAHS); this.overrideClientRMService = overrideClientRMService; this.overrideRTS = overrideRTS; + this.overrideApplicationMasterService = overrideApplicationMasterService; } public boolean getStartFailoverFlag() { @@ -324,6 +336,11 @@ private boolean waittingForFailOver() { if (count >= maximumWaittingTime) { return false; } + try { + Thread.sleep(100); + } catch (InterruptedException e) { + // DO NOTHING + } return true; } @@ -354,6 +371,14 @@ protected ResourceTrackerService createResourceTrackerService() { } return super.createResourceTrackerService(); } + @Override + protected ApplicationMasterService createApplicationMasterService() { + if (overrideApplicationMasterService) { + return new CustomedApplicationMasterService(this.rmContext, + this.scheduler); + } + return super.createApplicationMasterService(); + } }; } @@ -717,5 +742,31 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) return super.nodeHeartbeat(request); } } + + private class CustomedApplicationMasterService extends + ApplicationMasterService { + public CustomedApplicationMasterService(RMContext rmContext, + YarnScheduler scheduler) { + super(rmContext, scheduler); + } + + @Override + public AllocateResponse allocate(AllocateRequest request) + throws YarnException, IOException { + resetStartFailoverFlag(true); + // make sure failover has been triggered + Assert.assertTrue(waittingForFailOver()); + return createFakeAllocateResponse(); + } + + } + + public AllocateResponse createFakeAllocateResponse() { + return AllocateResponse.newInstance(-1, + new ArrayList(), + new ArrayList(), new ArrayList(), + Resource.newInstance(1024, 2), AMCommand.AM_RESYNC, 1, + null, new ArrayList()); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationClientProtocolOnHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationClientProtocolOnHA.java index 2aa6cc639bc..bfc6656c2d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationClientProtocolOnHA.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationClientProtocolOnHA.java @@ -51,7 +51,7 @@ public class TestApplicationClientProtocolOnHA extends ProtocolHATestBase { @Before public void initiate() throws Exception { - startHACluster(1, true, false); + startHACluster(1, true, false, false); Configuration conf = new YarnConfiguration(this.conf); client = createAndStartYarnClient(conf); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceOnHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceOnHA.java new file mode 100644 index 00000000000..13020e8ba41 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceOnHA.java @@ -0,0 +1,94 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.client; + +import java.io.IOException; +import java.util.ArrayList; + +import junit.framework.Assert; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + + +public class TestApplicationMasterServiceOnHA extends ProtocolHATestBase{ + private ApplicationMasterProtocol amClient; + private ApplicationAttemptId attemptId ; + RMAppAttempt appAttempt; + + @Before + public void initiate() throws Exception { + startHACluster(0, false, false, true); + attemptId = this.cluster.createFakeApplicationAttemptId(); + amClient = ClientRMProxy + .createRMProxy(this.conf, ApplicationMasterProtocol.class); + + AMRMTokenIdentifier id = + new AMRMTokenIdentifier(attemptId); + Token appToken = + new Token(id, this.cluster.getResourceManager() + .getRMContext().getAMRMTokenSecretManager()); + appToken.setService(new Text("appToken service")); + UserGroupInformation.setLoginUser(UserGroupInformation + .createRemoteUser(UserGroupInformation.getCurrentUser() + .getUserName())); + UserGroupInformation.getCurrentUser().addToken(appToken); + syncToken(appToken); + } + + @After + public void shutDown() { + if(this.amClient != null) { + RPC.stopProxy(this.amClient); + } + } + + @Test(timeout = 15000) + public void testAllocateOnHA() throws YarnException, IOException { + AllocateRequest request = AllocateRequest.newInstance(0, 50f, + new ArrayList(), + new ArrayList(), + ResourceBlacklistRequest.newInstance(new ArrayList(), + new ArrayList())); + AllocateResponse response = amClient.allocate(request); + Assert.assertEquals(response, this.cluster.createFakeAllocateResponse()); + } + + private void syncToken(Token token) throws IOException { + for (int i = 0; i < this.cluster.getNumOfResourceManager(); i++) { + this.cluster.getResourceManager(i).getRMContext() + .getAMRMTokenSecretManager().addPersistedPassword(token); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java index f2d8bc283d2..498dbe30035 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java @@ -41,7 +41,7 @@ public class TestResourceTrackerOnHA extends ProtocolHATestBase{ @Before public void initiate() throws Exception { - startHACluster(0, false, true); + startHACluster(0, false, true, false); this.resourceTracker = getRMClient(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java index 8632815f33f..f6416641344 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java @@ -739,4 +739,8 @@ protected void doSecureLogin() throws IOException { } }; } + + public int getNumOfResourceManager() { + return this.resourceManagers.length; + } }