YARN-1779. Fixed AMRMClient to handle AMRMTokens correctly across ResourceManager work-preserving-restart or failover. Contributed by Jian He.

This commit is contained in:
Vinod Kumar Vavilapalli 2014-09-18 10:16:18 -07:00
parent ee21b13cbd
commit a3d9934f91
8 changed files with 77 additions and 28 deletions

View File

@ -383,6 +383,9 @@ Release 2.6.0 - UNRELEASED
YARN-2559. Fixed NPE in SystemMetricsPublisher when retrieving
FinalApplicationStatus. (Zhijie Shen via jianhe)
YARN-1779. Fixed AMRMClient to handle AMRMTokens correctly across
ResourceManager work-preserving-restart or failover. (Jian He via vinodkv)
Release 2.5.1 - 2014-09-05
INCOMPATIBLE CHANGES

View File

@ -53,7 +53,7 @@ public class TestUnmanagedAMLauncher {
.getLog(TestUnmanagedAMLauncher.class);
protected static MiniYARNCluster yarnCluster = null;
protected static Configuration conf = new Configuration();
protected static Configuration conf = new YarnConfiguration();
@BeforeClass
public static void setup() throws InterruptedException, IOException {

View File

@ -756,6 +756,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
new org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>(token
.getIdentifier().array(), token.getPassword().array(), new Text(
token.getKind()), new Text(token.getService()));
amrmToken.setService(ClientRMProxy.getAMRMTokenService(getConfig()));
UserGroupInformation currentUGI = UserGroupInformation.getCurrentUser();
if (UserGroupInformation.isSecurityEnabled()) {
currentUGI = UserGroupInformation.getLoginUser();

View File

@ -57,7 +57,7 @@ public class TestApplicationMasterServiceOnHA extends ProtocolHATestBase{
Token<AMRMTokenIdentifier> appToken =
this.cluster.getResourceManager().getRMContext()
.getAMRMTokenSecretManager().createAndGetAMRMToken(attemptId);
appToken.setService(new Text("appToken service"));
appToken.setService(ClientRMProxy.getAMRMTokenService(conf));
UserGroupInformation.setLoginUser(UserGroupInformation
.createRemoteUser(UserGroupInformation.getCurrentUser()
.getUserName()));

View File

@ -70,6 +70,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.NMTokenCache;
@ -196,6 +197,7 @@ public class TestAMRMClient {
// of testing.
UserGroupInformation.setLoginUser(UserGroupInformation
.createRemoteUser(UserGroupInformation.getCurrentUser().getUserName()));
appAttempt.getAMRMToken().setService(ClientRMProxy.getAMRMTokenService(conf));
UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken());
}

View File

@ -22,11 +22,12 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import com.google.common.base.Joiner;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.SecurityUtil;
@ -40,6 +41,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
@InterfaceAudience.Public
@ -70,23 +72,17 @@ public class ClientRMProxy<T> extends RMProxy<T> {
return createRMProxy(configuration, protocol, INSTANCE);
}
private static void setupTokens(InetSocketAddress resourceManagerAddress)
private static void setAMRMTokenService(final Configuration conf)
throws IOException {
// It is assumed for now that the only AMRMToken in AM's UGI is for this
// cluster/RM. TODO: Fix later when we have some kind of cluster-ID as
// default service-address, see YARN-1779.
for (Token<? extends TokenIdentifier> token : UserGroupInformation
.getCurrentUser().getTokens()) {
if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
// This token needs to be directly provided to the AMs, so set the
// appropriate service-name. We'll need more infrastructure when we
// need to set it in HA case.
SecurityUtil.setTokenService(token, resourceManagerAddress);
token.setService(getAMRMTokenService(conf));
}
}
}
@InterfaceAudience.Private
@Private
@Override
protected InetSocketAddress getRMAddress(YarnConfiguration conf,
Class<?> protocol) throws IOException {
@ -100,12 +96,10 @@ public class ClientRMProxy<T> extends RMProxy<T> {
YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADMIN_PORT);
} else if (protocol == ApplicationMasterProtocol.class) {
InetSocketAddress serviceAddr =
conf.getSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
setupTokens(serviceAddr);
return serviceAddr;
setAMRMTokenService(conf);
return conf.getSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
} else {
String message = "Unsupported protocol found when creating the proxy " +
"connection to ResourceManager: " +
@ -115,7 +109,7 @@ public class ClientRMProxy<T> extends RMProxy<T> {
}
}
@InterfaceAudience.Private
@Private
@Override
protected void checkAllowedProtocols(Class<?> protocol) {
Preconditions.checkArgument(
@ -132,8 +126,23 @@ public class ClientRMProxy<T> extends RMProxy<T> {
* RMDelegationToken for
* @return - Service name for RMDelegationToken
*/
@InterfaceStability.Unstable
@Unstable
public static Text getRMDelegationTokenService(Configuration conf) {
return getTokenService(conf, YarnConfiguration.RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_PORT);
}
@Unstable
public static Text getAMRMTokenService(Configuration conf) {
return getTokenService(conf, YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
}
@Unstable
public static Text getTokenService(Configuration conf, String address,
String defaultAddr, int defaultPort) {
if (HAUtil.isHAEnabled(conf)) {
// Build a list of service addresses to form the service name
ArrayList<String> services = new ArrayList<String>();
@ -142,17 +151,14 @@ public class ClientRMProxy<T> extends RMProxy<T> {
// Set RM_ID to get the corresponding RM_ADDRESS
yarnConf.set(YarnConfiguration.RM_HA_ID, rmId);
services.add(SecurityUtil.buildTokenService(
yarnConf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_PORT)).toString());
yarnConf.getSocketAddr(address, defaultAddr, defaultPort))
.toString());
}
return new Text(Joiner.on(',').join(services));
}
// Non-HA case - no need to set RM_ID
return SecurityUtil.buildTokenService(
conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_PORT));
return SecurityUtil.buildTokenService(conf.getSocketAddr(address,
defaultAddr, defaultPort));
}
}

View File

@ -48,11 +48,18 @@ public class AMRMTokenSelector implements
LOG.debug("Token kind is " + token.getKind().toString()
+ " and the token's service name is " + token.getService());
if (AMRMTokenIdentifier.KIND_NAME.equals(token.getKind())
&& service.equals(token.getService())) {
&& checkService(service, token)) {
return (Token<AMRMTokenIdentifier>) token;
}
}
return null;
}
private boolean checkService(Text service,
Token<? extends TokenIdentifier> token) {
if (service == null || token.getService() == null) {
return false;
}
return token.getService().toString().contains(service.toString());
}
}

View File

@ -56,4 +56,34 @@ public class TestClientRMProxy {
service.contains(defaultRMAddress));
}
}
@Test
public void testGetAMRMTokenService() {
String defaultRMAddress = YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS;
YarnConfiguration conf = new YarnConfiguration();
// HA is not enabled
Text tokenService = ClientRMProxy.getAMRMTokenService(conf);
String[] services = tokenService.toString().split(",");
assertEquals(1, services.length);
for (String service : services) {
assertTrue("Incorrect token service name",
service.contains(defaultRMAddress));
}
// HA is enabled
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
conf.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2");
conf.set(HAUtil.addSuffix(YarnConfiguration.RM_HOSTNAME, "rm1"),
"0.0.0.0");
conf.set(HAUtil.addSuffix(YarnConfiguration.RM_HOSTNAME, "rm2"),
"0.0.0.0");
tokenService = ClientRMProxy.getAMRMTokenService(conf);
services = tokenService.toString().split(",");
assertEquals(2, services.length);
for (String service : services) {
assertTrue("Incorrect token service name",
service.contains(defaultRMAddress));
}
}
}