From 514bfeafd93ee13c0add207d7a7a3ef946a462f7 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Thu, 18 Sep 2014 10:16:18 -0700 Subject: [PATCH] YARN-1779. Fixed AMRMClient to handle AMRMTokens correctly across ResourceManager work-preserving-restart or failover. Contributed by Jian He. (cherry picked from commit a3d9934f916471a845dc679449d08f94dead550d) --- hadoop-yarn-project/CHANGES.txt | 3 + .../TestUnmanagedAMLauncher.java | 2 +- .../yarn/client/api/impl/AMRMClientImpl.java | 1 + .../TestApplicationMasterServiceOnHA.java | 2 +- .../yarn/client/api/impl/TestAMRMClient.java | 2 + .../hadoop/yarn/client/ClientRMProxy.java | 56 ++++++++++--------- .../yarn/security/AMRMTokenSelector.java | 9 ++- .../hadoop/yarn/client/TestClientRMProxy.java | 30 ++++++++++ 8 files changed, 77 insertions(+), 28 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 27e92bede15..b98b42e224c 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -353,6 +353,9 @@ Release 2.6.0 - UNRELEASED YARN-2559. Fixed NPE in SystemMetricsPublisher when retrieving FinalApplicationStatus. (Zhijie Shen via jianhe) + YARN-1779. Fixed AMRMClient to handle AMRMTokens correctly across + ResourceManager work-preserving-restart or failover. (Jian He via vinodkv) + Release 2.5.1 - 2014-09-05 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java index 08cacee2b24..3aba01a0efe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java @@ -53,7 +53,7 @@ public class TestUnmanagedAMLauncher { .getLog(TestUnmanagedAMLauncher.class); protected static MiniYARNCluster yarnCluster = null; - protected static Configuration conf = new Configuration(); + protected static Configuration conf = new YarnConfiguration(); @BeforeClass public static void setup() throws InterruptedException, IOException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java index e36d7ade78c..88b2f456a89 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java @@ -756,6 +756,7 @@ public class AMRMClientImpl extends AMRMClient { new org.apache.hadoop.security.token.Token(token .getIdentifier().array(), token.getPassword().array(), new Text( token.getKind()), new Text(token.getService())); + amrmToken.setService(ClientRMProxy.getAMRMTokenService(getConfig())); UserGroupInformation currentUGI = UserGroupInformation.getCurrentUser(); if (UserGroupInformation.isSecurityEnabled()) { currentUGI = UserGroupInformation.getLoginUser(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceOnHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceOnHA.java index 0b42ac3c6b9..5b12940ce59 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceOnHA.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceOnHA.java @@ -57,7 +57,7 @@ public class TestApplicationMasterServiceOnHA extends ProtocolHATestBase{ Token appToken = this.cluster.getResourceManager().getRMContext() .getAMRMTokenSecretManager().createAndGetAMRMToken(attemptId); - appToken.setService(new Text("appToken service")); + appToken.setService(ClientRMProxy.getAMRMTokenService(conf)); UserGroupInformation.setLoginUser(UserGroupInformation .createRemoteUser(UserGroupInformation.getCurrentUser() .getUserName())); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java index 38dbf79da99..a434e35a9f2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java @@ -70,6 +70,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.NMTokenCache; @@ -196,6 +197,7 @@ public class TestAMRMClient { // of testing. UserGroupInformation.setLoginUser(UserGroupInformation .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName())); + appAttempt.getAMRMToken().setService(ClientRMProxy.getAMRMTokenService(conf)); UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java index 3434755274e..b29263edcd8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java @@ -22,11 +22,12 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; -import com.google.common.base.Joiner; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.SecurityUtil; @@ -40,6 +41,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; +import com.google.common.base.Joiner; import com.google.common.base.Preconditions; @InterfaceAudience.Public @@ -70,23 +72,17 @@ public class ClientRMProxy extends RMProxy { return createRMProxy(configuration, protocol, INSTANCE); } - private static void setupTokens(InetSocketAddress resourceManagerAddress) + private static void setAMRMTokenService(final Configuration conf) throws IOException { - // It is assumed for now that the only AMRMToken in AM's UGI is for this - // cluster/RM. TODO: Fix later when we have some kind of cluster-ID as - // default service-address, see YARN-1779. for (Token token : UserGroupInformation .getCurrentUser().getTokens()) { if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) { - // This token needs to be directly provided to the AMs, so set the - // appropriate service-name. We'll need more infrastructure when we - // need to set it in HA case. - SecurityUtil.setTokenService(token, resourceManagerAddress); + token.setService(getAMRMTokenService(conf)); } } } - @InterfaceAudience.Private + @Private @Override protected InetSocketAddress getRMAddress(YarnConfiguration conf, Class protocol) throws IOException { @@ -100,12 +96,10 @@ public class ClientRMProxy extends RMProxy { YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS, YarnConfiguration.DEFAULT_RM_ADMIN_PORT); } else if (protocol == ApplicationMasterProtocol.class) { - InetSocketAddress serviceAddr = - conf.getSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS, - YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, - YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); - setupTokens(serviceAddr); - return serviceAddr; + setAMRMTokenService(conf); + return conf.getSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); } else { String message = "Unsupported protocol found when creating the proxy " + "connection to ResourceManager: " + @@ -115,7 +109,7 @@ public class ClientRMProxy extends RMProxy { } } - @InterfaceAudience.Private + @Private @Override protected void checkAllowedProtocols(Class protocol) { Preconditions.checkArgument( @@ -132,8 +126,23 @@ public class ClientRMProxy extends RMProxy { * RMDelegationToken for * @return - Service name for RMDelegationToken */ - @InterfaceStability.Unstable + @Unstable public static Text getRMDelegationTokenService(Configuration conf) { + return getTokenService(conf, YarnConfiguration.RM_ADDRESS, + YarnConfiguration.DEFAULT_RM_ADDRESS, + YarnConfiguration.DEFAULT_RM_PORT); + } + + @Unstable + public static Text getAMRMTokenService(Configuration conf) { + return getTokenService(conf, YarnConfiguration.RM_SCHEDULER_ADDRESS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); + } + + @Unstable + public static Text getTokenService(Configuration conf, String address, + String defaultAddr, int defaultPort) { if (HAUtil.isHAEnabled(conf)) { // Build a list of service addresses to form the service name ArrayList services = new ArrayList(); @@ -142,17 +151,14 @@ public class ClientRMProxy extends RMProxy { // Set RM_ID to get the corresponding RM_ADDRESS yarnConf.set(YarnConfiguration.RM_HA_ID, rmId); services.add(SecurityUtil.buildTokenService( - yarnConf.getSocketAddr(YarnConfiguration.RM_ADDRESS, - YarnConfiguration.DEFAULT_RM_ADDRESS, - YarnConfiguration.DEFAULT_RM_PORT)).toString()); + yarnConf.getSocketAddr(address, defaultAddr, defaultPort)) + .toString()); } return new Text(Joiner.on(',').join(services)); } // Non-HA case - no need to set RM_ID - return SecurityUtil.buildTokenService( - conf.getSocketAddr(YarnConfiguration.RM_ADDRESS, - YarnConfiguration.DEFAULT_RM_ADDRESS, - YarnConfiguration.DEFAULT_RM_PORT)); + return SecurityUtil.buildTokenService(conf.getSocketAddr(address, + defaultAddr, defaultPort)); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AMRMTokenSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AMRMTokenSelector.java index 469383963ec..be3701d7048 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AMRMTokenSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AMRMTokenSelector.java @@ -48,11 +48,18 @@ public class AMRMTokenSelector implements LOG.debug("Token kind is " + token.getKind().toString() + " and the token's service name is " + token.getService()); if (AMRMTokenIdentifier.KIND_NAME.equals(token.getKind()) - && service.equals(token.getService())) { + && checkService(service, token)) { return (Token) token; } } return null; } + private boolean checkService(Text service, + Token token) { + if (service == null || token.getService() == null) { + return false; + } + return token.getService().toString().contains(service.toString()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/TestClientRMProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/TestClientRMProxy.java index 1a252abf5b3..700a37ff31b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/TestClientRMProxy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/TestClientRMProxy.java @@ -56,4 +56,34 @@ public class TestClientRMProxy { service.contains(defaultRMAddress)); } } + + @Test + public void testGetAMRMTokenService() { + String defaultRMAddress = YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS; + YarnConfiguration conf = new YarnConfiguration(); + + // HA is not enabled + Text tokenService = ClientRMProxy.getAMRMTokenService(conf); + String[] services = tokenService.toString().split(","); + assertEquals(1, services.length); + for (String service : services) { + assertTrue("Incorrect token service name", + service.contains(defaultRMAddress)); + } + + // HA is enabled + conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); + conf.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2"); + conf.set(HAUtil.addSuffix(YarnConfiguration.RM_HOSTNAME, "rm1"), + "0.0.0.0"); + conf.set(HAUtil.addSuffix(YarnConfiguration.RM_HOSTNAME, "rm2"), + "0.0.0.0"); + tokenService = ClientRMProxy.getAMRMTokenService(conf); + services = tokenService.toString().split(","); + assertEquals(2, services.length); + for (String service : services) { + assertTrue("Incorrect token service name", + service.contains(defaultRMAddress)); + } + } }