YARN-1779. Fixed AMRMClient to handle AMRMTokens correctly across ResourceManager work-preserving-restart or failover. Contributed by Jian He.

(cherry picked from commit a3d9934f91)
This commit is contained in:
Vinod Kumar Vavilapalli 2014-09-18 10:16:18 -07:00
parent d61cdd66db
commit 514bfeafd9
8 changed files with 77 additions and 28 deletions

View File

@ -353,6 +353,9 @@ Release 2.6.0 - UNRELEASED
YARN-2559. Fixed NPE in SystemMetricsPublisher when retrieving YARN-2559. Fixed NPE in SystemMetricsPublisher when retrieving
FinalApplicationStatus. (Zhijie Shen via jianhe) FinalApplicationStatus. (Zhijie Shen via jianhe)
YARN-1779. Fixed AMRMClient to handle AMRMTokens correctly across
ResourceManager work-preserving-restart or failover. (Jian He via vinodkv)
Release 2.5.1 - 2014-09-05 Release 2.5.1 - 2014-09-05
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -53,7 +53,7 @@ public class TestUnmanagedAMLauncher {
.getLog(TestUnmanagedAMLauncher.class); .getLog(TestUnmanagedAMLauncher.class);
protected static MiniYARNCluster yarnCluster = null; protected static MiniYARNCluster yarnCluster = null;
protected static Configuration conf = new Configuration(); protected static Configuration conf = new YarnConfiguration();
@BeforeClass @BeforeClass
public static void setup() throws InterruptedException, IOException { public static void setup() throws InterruptedException, IOException {

View File

@ -756,6 +756,7 @@ private void updateAMRMToken(Token token) throws IOException {
new org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>(token new org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>(token
.getIdentifier().array(), token.getPassword().array(), new Text( .getIdentifier().array(), token.getPassword().array(), new Text(
token.getKind()), new Text(token.getService())); token.getKind()), new Text(token.getService()));
amrmToken.setService(ClientRMProxy.getAMRMTokenService(getConfig()));
UserGroupInformation currentUGI = UserGroupInformation.getCurrentUser(); UserGroupInformation currentUGI = UserGroupInformation.getCurrentUser();
if (UserGroupInformation.isSecurityEnabled()) { if (UserGroupInformation.isSecurityEnabled()) {
currentUGI = UserGroupInformation.getLoginUser(); currentUGI = UserGroupInformation.getLoginUser();

View File

@ -57,7 +57,7 @@ public void initiate() throws Exception {
Token<AMRMTokenIdentifier> appToken = Token<AMRMTokenIdentifier> appToken =
this.cluster.getResourceManager().getRMContext() this.cluster.getResourceManager().getRMContext()
.getAMRMTokenSecretManager().createAndGetAMRMToken(attemptId); .getAMRMTokenSecretManager().createAndGetAMRMToken(attemptId);
appToken.setService(new Text("appToken service")); appToken.setService(ClientRMProxy.getAMRMTokenService(conf));
UserGroupInformation.setLoginUser(UserGroupInformation UserGroupInformation.setLoginUser(UserGroupInformation
.createRemoteUser(UserGroupInformation.getCurrentUser() .createRemoteUser(UserGroupInformation.getCurrentUser()
.getUserName())); .getUserName()));

View File

@ -70,6 +70,7 @@
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.NMTokenCache; import org.apache.hadoop.yarn.client.api.NMTokenCache;
@ -196,6 +197,7 @@ Collections.<String, LocalResource> emptyMap(),
// of testing. // of testing.
UserGroupInformation.setLoginUser(UserGroupInformation UserGroupInformation.setLoginUser(UserGroupInformation
.createRemoteUser(UserGroupInformation.getCurrentUser().getUserName())); .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName()));
appAttempt.getAMRMToken().setService(ClientRMProxy.getAMRMTokenService(conf));
UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken()); UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken());
} }

View File

@ -22,11 +22,12 @@
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import com.google.common.base.Joiner;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.SecurityUtil;
@ -40,6 +41,7 @@
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
@InterfaceAudience.Public @InterfaceAudience.Public
@ -70,23 +72,17 @@ public static <T> T createRMProxy(final Configuration configuration,
return createRMProxy(configuration, protocol, INSTANCE); return createRMProxy(configuration, protocol, INSTANCE);
} }
private static void setupTokens(InetSocketAddress resourceManagerAddress) private static void setAMRMTokenService(final Configuration conf)
throws IOException { throws IOException {
// It is assumed for now that the only AMRMToken in AM's UGI is for this
// cluster/RM. TODO: Fix later when we have some kind of cluster-ID as
// default service-address, see YARN-1779.
for (Token<? extends TokenIdentifier> token : UserGroupInformation for (Token<? extends TokenIdentifier> token : UserGroupInformation
.getCurrentUser().getTokens()) { .getCurrentUser().getTokens()) {
if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) { if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
// This token needs to be directly provided to the AMs, so set the token.setService(getAMRMTokenService(conf));
// appropriate service-name. We'll need more infrastructure when we
// need to set it in HA case.
SecurityUtil.setTokenService(token, resourceManagerAddress);
} }
} }
} }
@InterfaceAudience.Private @Private
@Override @Override
protected InetSocketAddress getRMAddress(YarnConfiguration conf, protected InetSocketAddress getRMAddress(YarnConfiguration conf,
Class<?> protocol) throws IOException { Class<?> protocol) throws IOException {
@ -100,12 +96,10 @@ protected InetSocketAddress getRMAddress(YarnConfiguration conf,
YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS, YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADMIN_PORT); YarnConfiguration.DEFAULT_RM_ADMIN_PORT);
} else if (protocol == ApplicationMasterProtocol.class) { } else if (protocol == ApplicationMasterProtocol.class) {
InetSocketAddress serviceAddr = setAMRMTokenService(conf);
conf.getSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS, return conf.getSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
setupTokens(serviceAddr);
return serviceAddr;
} else { } else {
String message = "Unsupported protocol found when creating the proxy " + String message = "Unsupported protocol found when creating the proxy " +
"connection to ResourceManager: " + "connection to ResourceManager: " +
@ -115,7 +109,7 @@ protected InetSocketAddress getRMAddress(YarnConfiguration conf,
} }
} }
@InterfaceAudience.Private @Private
@Override @Override
protected void checkAllowedProtocols(Class<?> protocol) { protected void checkAllowedProtocols(Class<?> protocol) {
Preconditions.checkArgument( Preconditions.checkArgument(
@ -132,8 +126,23 @@ protected void checkAllowedProtocols(Class<?> protocol) {
* RMDelegationToken for * RMDelegationToken for
* @return - Service name for RMDelegationToken * @return - Service name for RMDelegationToken
*/ */
@InterfaceStability.Unstable @Unstable
public static Text getRMDelegationTokenService(Configuration conf) { public static Text getRMDelegationTokenService(Configuration conf) {
return getTokenService(conf, YarnConfiguration.RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_PORT);
}
@Unstable
public static Text getAMRMTokenService(Configuration conf) {
return getTokenService(conf, YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
}
@Unstable
public static Text getTokenService(Configuration conf, String address,
String defaultAddr, int defaultPort) {
if (HAUtil.isHAEnabled(conf)) { if (HAUtil.isHAEnabled(conf)) {
// Build a list of service addresses to form the service name // Build a list of service addresses to form the service name
ArrayList<String> services = new ArrayList<String>(); ArrayList<String> services = new ArrayList<String>();
@ -142,17 +151,14 @@ public static Text getRMDelegationTokenService(Configuration conf) {
// Set RM_ID to get the corresponding RM_ADDRESS // Set RM_ID to get the corresponding RM_ADDRESS
yarnConf.set(YarnConfiguration.RM_HA_ID, rmId); yarnConf.set(YarnConfiguration.RM_HA_ID, rmId);
services.add(SecurityUtil.buildTokenService( services.add(SecurityUtil.buildTokenService(
yarnConf.getSocketAddr(YarnConfiguration.RM_ADDRESS, yarnConf.getSocketAddr(address, defaultAddr, defaultPort))
YarnConfiguration.DEFAULT_RM_ADDRESS, .toString());
YarnConfiguration.DEFAULT_RM_PORT)).toString());
} }
return new Text(Joiner.on(',').join(services)); return new Text(Joiner.on(',').join(services));
} }
// Non-HA case - no need to set RM_ID // Non-HA case - no need to set RM_ID
return SecurityUtil.buildTokenService( return SecurityUtil.buildTokenService(conf.getSocketAddr(address,
conf.getSocketAddr(YarnConfiguration.RM_ADDRESS, defaultAddr, defaultPort));
YarnConfiguration.DEFAULT_RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_PORT));
} }
} }

View File

@ -48,11 +48,18 @@ public Token<AMRMTokenIdentifier> selectToken(Text service,
LOG.debug("Token kind is " + token.getKind().toString() LOG.debug("Token kind is " + token.getKind().toString()
+ " and the token's service name is " + token.getService()); + " and the token's service name is " + token.getService());
if (AMRMTokenIdentifier.KIND_NAME.equals(token.getKind()) if (AMRMTokenIdentifier.KIND_NAME.equals(token.getKind())
&& service.equals(token.getService())) { && checkService(service, token)) {
return (Token<AMRMTokenIdentifier>) token; return (Token<AMRMTokenIdentifier>) token;
} }
} }
return null; return null;
} }
private boolean checkService(Text service,
Token<? extends TokenIdentifier> token) {
if (service == null || token.getService() == null) {
return false;
}
return token.getService().toString().contains(service.toString());
}
} }

View File

@ -56,4 +56,34 @@ public void testGetRMDelegationTokenService() {
service.contains(defaultRMAddress)); service.contains(defaultRMAddress));
} }
} }
@Test
public void testGetAMRMTokenService() {
String defaultRMAddress = YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS;
YarnConfiguration conf = new YarnConfiguration();
// HA is not enabled
Text tokenService = ClientRMProxy.getAMRMTokenService(conf);
String[] services = tokenService.toString().split(",");
assertEquals(1, services.length);
for (String service : services) {
assertTrue("Incorrect token service name",
service.contains(defaultRMAddress));
}
// HA is enabled
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
conf.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2");
conf.set(HAUtil.addSuffix(YarnConfiguration.RM_HOSTNAME, "rm1"),
"0.0.0.0");
conf.set(HAUtil.addSuffix(YarnConfiguration.RM_HOSTNAME, "rm2"),
"0.0.0.0");
tokenService = ClientRMProxy.getAMRMTokenService(conf);
services = tokenService.toString().split(",");
assertEquals(2, services.length);
for (String service : services) {
assertTrue("Incorrect token service name",
service.contains(defaultRMAddress));
}
}
} }