YARN-986. Changed client side to be able to figure out the right RM Delegation token for the right ResourceManager when HA is enabled. Contributed by Karthik Kambatla.

svn merge --ignore-ancestry -c 1574190 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1574191 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2014-03-04 20:39:33 +00:00
parent 45f24de631
commit 7c17cff55c
14 changed files with 177 additions and 30 deletions

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.mapred;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
@ -56,6 +55,7 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -74,7 +74,7 @@ public class ResourceMgrDelegate extends YarnClient {
@Private
@VisibleForTesting
protected YarnClient client;
private InetSocketAddress rmAddress;
private Text rmDTService;
/**
* Delegate responsible for communicating with the Resource Manager's
@ -91,9 +91,6 @@ public class ResourceMgrDelegate extends YarnClient {
@Override
protected void serviceInit(Configuration conf) throws Exception {
this.rmAddress = conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_PORT);
client.init(conf);
super.serviceInit(conf);
}
@ -155,8 +152,11 @@ public class ResourceMgrDelegate extends YarnClient {
}
}
InetSocketAddress getConnectAddress() {
return rmAddress;
public Text getRMDelegationTokenService() {
if (rmDTService == null) {
rmDTService = ClientRMProxy.getRMDelegationTokenService(conf);
}
return rmDTService;
}
@SuppressWarnings("rawtypes")
@ -164,7 +164,7 @@ public class ResourceMgrDelegate extends YarnClient {
InterruptedException {
try {
return ConverterUtils.convertFromYarn(
client.getRMDelegationToken(renewer), rmAddress);
client.getRMDelegationToken(renewer), getRMDelegationTokenService());
} catch (YarnException e) {
throw new IOException(e);
}

View File

@ -188,8 +188,7 @@ public class YARNRunner implements ClientProtocol {
* to make sure we add history server delegation tokens to the credentials
*/
RMDelegationTokenSelector tokenSelector = new RMDelegationTokenSelector();
Text service = SecurityUtil.buildTokenService(resMgrDelegate
.getConnectAddress());
Text service = resMgrDelegate.getRMDelegationTokenService();
if (tokenSelector.selectToken(service, ts.getAllTokens()) != null) {
Text hsService = SecurityUtil.buildTokenService(hsProxy
.getConnectAddress());

View File

@ -299,7 +299,7 @@ public class TestYARNRunner extends TestCase {
any(GetDelegationTokenRequest.class));
ResourceMgrDelegate rmDelegate = mock(ResourceMgrDelegate.class);
doReturn(mockRmAddress).when(rmDelegate).getConnectAddress();
doReturn(rmTokenSevice).when(rmDelegate).getRMDelegationTokenService();
ClientCache clientCache = mock(ClientCache.class);
doReturn(mockHsProxy).when(clientCache).getInitializedHSProxy();

View File

@ -237,6 +237,10 @@ Release 2.4.0 - UNRELEASED
YARN-1730. Implemented simple write-locking in the LevelDB based timeline-
store. (Billie Rinaldi via vinodkv)
YARN-986. Changed client side to be able to figure out the right RM Delegation
token for the right ResourceManager when HA is enabled. (Karthik Kambatla via
vinodkv)
OPTIMIZATIONS
BUG FIXES

View File

@ -1170,7 +1170,9 @@ public class YarnConfiguration extends Configuration {
/**
* Get the socket address for <code>name</code> property as a
* <code>InetSocketAddress</code>.
* <code>InetSocketAddress</code>. On a HA cluster,
* this fetches the address corresponding to the RM identified by
* {@link #RM_HA_ID}.
* @param name property name.
* @param defaultAddress the default value
* @param defaultPort the default port
@ -1227,4 +1229,14 @@ public class YarnConfiguration extends Configuration {
.get(YARN_HTTP_POLICY_KEY,
YARN_HTTP_POLICY_DEFAULT));
}
@Private
public static String getClusterId(Configuration conf) {
String clusterId = conf.get(YarnConfiguration.RM_CLUSTER_ID);
if (clusterId == null) {
throw new HadoopIllegalArgumentException("Configuration doesn't specify" +
YarnConfiguration.RM_CLUSTER_ID);
}
return clusterId;
}
}

View File

@ -290,7 +290,7 @@ public class YarnClientImpl extends YarnClient {
org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> amrmToken =
null;
if (token != null) {
amrmToken = ConverterUtils.convertFromYarn(token, null);
amrmToken = ConverterUtils.convertFromYarn(token, (Text) null);
}
return amrmToken;
}

View File

@ -20,23 +20,30 @@ package org.apache.hadoop.yarn.client;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import com.google.common.base.Joiner;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
import com.google.common.base.Preconditions;
@InterfaceAudience.Public
@InterfaceStability.Stable
public class ClientRMProxy<T> extends RMProxy<T> {
private static final Log LOG = LogFactory.getLog(ClientRMProxy.class);
private static final ClientRMProxy INSTANCE = new ClientRMProxy();
@ -67,7 +74,7 @@ public class ClientRMProxy<T> extends RMProxy<T> {
throws IOException {
// It is assumed for now that the only AMRMToken in AM's UGI is for this
// cluster/RM. TODO: Fix later when we have some kind of cluster-ID as
// default service-address, see YARN-986.
// default service-address, see YARN-1779.
for (Token<? extends TokenIdentifier> token : UserGroupInformation
.getCurrentUser().getTokens()) {
if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
@ -115,4 +122,37 @@ public class ClientRMProxy<T> extends RMProxy<T> {
protocol.isAssignableFrom(ClientRMProtocols.class),
"RM does not support this client protocol");
}
/**
* Get the token service name to be used for RMDelegationToken. Depending
* on whether HA is enabled or not, this method generates the appropriate
* service name as a comma-separated list of service addresses.
*
* @param conf Configuration corresponding to the cluster we need the
* RMDelegationToken for
* @return - Service name for RMDelegationToken
*/
@InterfaceStability.Unstable
public static Text getRMDelegationTokenService(Configuration conf) {
if (HAUtil.isHAEnabled(conf)) {
// Build a list of service addresses to form the service name
ArrayList<String> services = new ArrayList<String>();
YarnConfiguration yarnConf = new YarnConfiguration(conf);
for (String rmId : HAUtil.getRMHAIds(conf)) {
// Set RM_ID to get the corresponding RM_ADDRESS
yarnConf.set(YarnConfiguration.RM_HA_ID, rmId);
services.add(SecurityUtil.buildTokenService(
yarnConf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_PORT)).toString());
}
return new Text(Joiner.on(',').join(services));
}
// Non-HA case - no need to set RM_ID
return SecurityUtil.buildTokenService(
conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_PORT));
}
}

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.Records;
@ -139,16 +140,19 @@ public class RMDelegationTokenIdentifier extends AbstractDelegationTokenIdentifi
private static ApplicationClientProtocol getRmClient(Token<?> token,
Configuration conf) throws IOException {
InetSocketAddress addr = SecurityUtil.getTokenServiceAddr(token);
if (localSecretManager != null) {
// return null if it's our token
if (localServiceAddress.getAddress().isAnyLocalAddress()) {
String[] services = token.getService().toString().split(",");
for (String service : services) {
InetSocketAddress addr = NetUtils.createSocketAddr(service);
if (localSecretManager != null) {
// return null if it's our token
if (localServiceAddress.getAddress().isAnyLocalAddress()) {
if (NetUtils.isLocalAddress(addr.getAddress()) &&
addr.getPort() == localServiceAddress.getPort()) {
return null;
}
} else if (addr.equals(localServiceAddress)) {
return null;
} else if (addr.equals(localServiceAddress)) {
return null;
}
}
}
return ClientRMProxy.createRMProxy(conf, ApplicationClientProtocol.class);

View File

@ -37,6 +37,14 @@ public class RMDelegationTokenSelector implements
private static final Log LOG = LogFactory
.getLog(RMDelegationTokenSelector.class);
private boolean checkService(Text service,
Token<? extends TokenIdentifier> token) {
if (service == null || token.getService() == null) {
return false;
}
return token.getService().toString().contains(service.toString());
}
@SuppressWarnings("unchecked")
public Token<RMDelegationTokenIdentifier> selectToken(Text service,
Collection<Token<? extends TokenIdentifier>> tokens) {
@ -48,7 +56,7 @@ public class RMDelegationTokenSelector implements
LOG.debug("Token kind is " + token.getKind().toString()
+ " and the token's service name is " + token.getService());
if (RMDelegationTokenIdentifier.KIND_NAME.equals(token.getKind())
&& service.equals(token.getService())) {
&& checkService(service, token)) {
return (Token<RMDelegationTokenIdentifier>) token;
}
}

View File

@ -216,8 +216,12 @@ public class ConverterUtils {
}
/**
* Convert a protobuf token into a rpc token and set its service
*
* Convert a protobuf token into a rpc token and set its service. Supposed
* to be used for tokens other than RMDelegationToken. For
* RMDelegationToken, use
* {@link #convertFromYarn(org.apache.hadoop.yarn.api.records.Token,
* org.apache.hadoop.io.Text)} instead.
*
* @param protoToken the yarn token
* @param serviceAddr the connect address for the service
* @return rpc token
@ -234,4 +238,24 @@ public class ConverterUtils {
}
return token;
}
/**
* Convert a protobuf token into a rpc token and set its service.
*
* @param protoToken the yarn token
* @param service the service for the token
*/
public static <T extends TokenIdentifier> Token<T> convertFromYarn(
org.apache.hadoop.yarn.api.records.Token protoToken,
Text service) {
Token<T> token = new Token<T>(protoToken.getIdentifier().array(),
protoToken.getPassword().array(),
new Text(protoToken.getKind()),
new Text(protoToken.getService()));
if (service != null) {
token.setService(service);
}
return token;
}
}

View File

@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class TestClientRMProxy {
@Test
public void testGetRMDelegationTokenService() {
String defaultRMAddress = YarnConfiguration.DEFAULT_RM_ADDRESS;
YarnConfiguration conf = new YarnConfiguration();
// HA is not enabled
Text tokenService = ClientRMProxy.getRMDelegationTokenService(conf);
String[] services = tokenService.toString().split(",");
assertEquals(1, services.length);
for (String service : services) {
assertTrue("Incorrect token service name",
service.contains(defaultRMAddress));
}
// HA is enabled
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
conf.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2");
conf.set(HAUtil.addSuffix(YarnConfiguration.RM_HOSTNAME, "rm1"),
"0.0.0.0");
conf.set(HAUtil.addSuffix(YarnConfiguration.RM_HOSTNAME, "rm2"),
"0.0.0.0");
tokenService = ClientRMProxy.getRMDelegationTokenService(conf);
services = tokenService.toString().split(",");
assertEquals(2, services.length);
for (String service : services) {
assertTrue("Incorrect token service name",
service.contains(defaultRMAddress));
}
}
}

View File

@ -72,11 +72,7 @@ public class EmbeddedElectorService extends AbstractService
}
String rmId = HAUtil.getRMHAId(conf);
String clusterId = conf.get(YarnConfiguration.RM_CLUSTER_ID);
if (clusterId == null) {
throw new YarnRuntimeException(YarnConfiguration.RM_CLUSTER_ID +
" is not specified!");
}
String clusterId = YarnConfiguration.getClusterId(conf);
localActiveNodeInfo = createActiveNodeInfo(clusterId, rmId);
String zkBasePath = conf.get(YarnConfiguration.AUTO_FAILOVER_ZK_BASE_PATH,

View File

@ -548,7 +548,7 @@ public abstract class RMStateStore extends AbstractService {
ApplicationState appState) throws Exception;
// TODO: This should eventually become cluster-Id + "AM_RM_TOKEN_SERVICE". See
// YARN-986
// YARN-1779
public static final Text AM_RM_TOKEN_SERVICE = new Text(
"AM_RM_TOKEN_SERVICE");

View File

@ -27,6 +27,7 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.DelegationKey;
@ -102,7 +103,7 @@ public class TestRMDelegationTokens {
org.apache.hadoop.yarn.api.records.Token delegationToken =
response.getRMDelegationToken();
Token<RMDelegationTokenIdentifier> token1 =
ConverterUtils.convertFromYarn(delegationToken, null);
ConverterUtils.convertFromYarn(delegationToken, (Text) null);
RMDelegationTokenIdentifier dtId1 = token1.decodeIdentifier();
// wait for the first rollMasterKey