YARN-6602. Impersonation does not work if standby RM is contacted first (rkanter)
This commit is contained in:
parent
7851ae12cf
commit
faf36776c7
|
@ -48,7 +48,6 @@ import com.google.common.base.Preconditions;
|
|||
@InterfaceStability.Stable
|
||||
public class ClientRMProxy<T> extends RMProxy<T> {
|
||||
private static final Log LOG = LogFactory.getLog(ClientRMProxy.class);
|
||||
private static final ClientRMProxy INSTANCE = new ClientRMProxy();
|
||||
|
||||
private interface ClientRMProtocols extends ApplicationClientProtocol,
|
||||
ApplicationMasterProtocol, ResourceManagerAdministrationProtocol {
|
||||
|
@ -69,7 +68,8 @@ public class ClientRMProxy<T> extends RMProxy<T> {
|
|||
*/
|
||||
public static <T> T createRMProxy(final Configuration configuration,
|
||||
final Class<T> protocol) throws IOException {
|
||||
return createRMProxy(configuration, protocol, INSTANCE);
|
||||
ClientRMProxy<T> clientRMProxy = new ClientRMProxy<>();
|
||||
return createRMProxy(configuration, protocol, clientRMProxy);
|
||||
}
|
||||
|
||||
private static void setAMRMTokenService(final Configuration conf)
|
||||
|
|
|
@ -74,7 +74,7 @@ public class ConfiguredRMFailoverProxyProvider<T>
|
|||
protected T getProxyInternal() {
|
||||
try {
|
||||
final InetSocketAddress rmAddress = rmProxy.getRMAddress(conf, protocol);
|
||||
return RMProxy.getProxy(conf, protocol, rmAddress);
|
||||
return rmProxy.getProxy(conf, protocol, rmAddress);
|
||||
} catch (IOException ioe) {
|
||||
LOG.error("Unable to create proxy to the ResourceManager " +
|
||||
rmServiceIds[currentProxyIndex], ioe);
|
||||
|
|
|
@ -57,8 +57,15 @@ import com.google.common.annotations.VisibleForTesting;
|
|||
public class RMProxy<T> {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(RMProxy.class);
|
||||
private UserGroupInformation user;
|
||||
|
||||
protected RMProxy() {}
|
||||
protected RMProxy() {
|
||||
try {
|
||||
this.user = UserGroupInformation.getCurrentUser();
|
||||
} catch (IOException ioe) {
|
||||
throw new YarnRuntimeException("Unable to determine user", ioe);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify the passed protocol is supported.
|
||||
|
@ -86,7 +93,7 @@ public class RMProxy<T> {
|
|||
*/
|
||||
@Private
|
||||
protected static <T> T createRMProxy(final Configuration configuration,
|
||||
final Class<T> protocol, RMProxy instance) throws IOException {
|
||||
final Class<T> protocol, RMProxy<T> instance) throws IOException {
|
||||
YarnConfiguration conf = (configuration instanceof YarnConfiguration)
|
||||
? (YarnConfiguration) configuration
|
||||
: new YarnConfiguration(configuration);
|
||||
|
@ -103,7 +110,7 @@ public class RMProxy<T> {
|
|||
*/
|
||||
@Private
|
||||
protected static <T> T createRMProxy(final Configuration configuration,
|
||||
final Class<T> protocol, RMProxy instance, final long retryTime,
|
||||
final Class<T> protocol, RMProxy<T> instance, final long retryTime,
|
||||
final long retryInterval) throws IOException {
|
||||
YarnConfiguration conf = (configuration instanceof YarnConfiguration)
|
||||
? (YarnConfiguration) configuration
|
||||
|
@ -114,7 +121,7 @@ public class RMProxy<T> {
|
|||
}
|
||||
|
||||
private static <T> T newProxyInstance(final YarnConfiguration conf,
|
||||
final Class<T> protocol, RMProxy instance, RetryPolicy retryPolicy)
|
||||
final Class<T> protocol, RMProxy<T> instance, RetryPolicy retryPolicy)
|
||||
throws IOException{
|
||||
if (HAUtil.isHAEnabled(conf)) {
|
||||
RMFailoverProxyProvider<T> provider =
|
||||
|
@ -123,44 +130,20 @@ public class RMProxy<T> {
|
|||
} else {
|
||||
InetSocketAddress rmAddress = instance.getRMAddress(conf, protocol);
|
||||
LOG.info("Connecting to ResourceManager at " + rmAddress);
|
||||
T proxy = RMProxy.<T>getProxy(conf, protocol, rmAddress);
|
||||
T proxy = instance.getProxy(conf, protocol, rmAddress);
|
||||
return (T) RetryProxy.create(protocol, proxy, retryPolicy);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated
|
||||
* This method is deprecated and is not used by YARN internally any more.
|
||||
* To create a proxy to the RM, use ClientRMProxy#createRMProxy or
|
||||
* ServerRMProxy#createRMProxy.
|
||||
*
|
||||
* Create a proxy to the ResourceManager at the specified address.
|
||||
*
|
||||
* @param conf Configuration to generate retry policy
|
||||
* @param protocol Protocol for the proxy
|
||||
* @param rmAddress Address of the ResourceManager
|
||||
* @param <T> Type information of the proxy
|
||||
* @return Proxy to the RM
|
||||
* @throws IOException
|
||||
*/
|
||||
@Deprecated
|
||||
public static <T> T createRMProxy(final Configuration conf,
|
||||
final Class<T> protocol, InetSocketAddress rmAddress) throws IOException {
|
||||
RetryPolicy retryPolicy = createRetryPolicy(conf, HAUtil.isHAEnabled(conf));
|
||||
T proxy = RMProxy.<T>getProxy(conf, protocol, rmAddress);
|
||||
LOG.info("Connecting to ResourceManager at " + rmAddress);
|
||||
return (T) RetryProxy.create(protocol, proxy, retryPolicy);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a proxy to the RM at the specified address. To be used to create a
|
||||
* RetryProxy.
|
||||
*/
|
||||
@Private
|
||||
static <T> T getProxy(final Configuration conf,
|
||||
<T> T getProxy(final Configuration conf,
|
||||
final Class<T> protocol, final InetSocketAddress rmAddress)
|
||||
throws IOException {
|
||||
return UserGroupInformation.getCurrentUser().doAs(
|
||||
return user.doAs(
|
||||
new PrivilegedAction<T>() {
|
||||
@Override
|
||||
public T run() {
|
||||
|
|
|
@ -95,7 +95,7 @@ public class RequestHedgingRMFailoverProxyProvider<T>
|
|||
// Create proxy that can retry exceptions properly.
|
||||
RetryPolicy retryPolicy = RMProxy.createRetryPolicy(conf, false);
|
||||
InetSocketAddress rmAddress = rmProxy.getRMAddress(conf, protocol);
|
||||
T proxy = RMProxy.<T> getProxy(conf, protocol, rmAddress);
|
||||
T proxy = rmProxy.getProxy(conf, protocol, rmAddress);
|
||||
return (T) RetryProxy.create(protocol, proxy, retryPolicy);
|
||||
} catch (IOException ioe) {
|
||||
LOG.error("Unable to create proxy to the ResourceManager "
|
||||
|
|
|
@ -18,12 +18,26 @@
|
|||
|
||||
package org.apache.hadoop.yarn.client;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
|
||||
import org.apache.hadoop.yarn.conf.HAUtil;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class TestClientRMProxy {
|
||||
|
@ -86,4 +100,99 @@ public class TestClientRMProxy {
|
|||
service.contains(defaultRMAddress));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify that the RPC layer is always created using the correct UGI from the
|
||||
* RMProxy. It should always use the UGI from creation in subsequent uses,
|
||||
* even outside of a doAs.
|
||||
*
|
||||
* @throws Exception an Exception occurred
|
||||
*/
|
||||
@Test
|
||||
public void testProxyUserCorrectUGI() throws Exception {
|
||||
final YarnConfiguration conf = new YarnConfiguration();
|
||||
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
|
||||
conf.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2");
|
||||
conf.set(HAUtil.addSuffix(YarnConfiguration.RM_HOSTNAME, "rm1"),
|
||||
"0.0.0.0");
|
||||
conf.set(HAUtil.addSuffix(YarnConfiguration.RM_HOSTNAME, "rm2"),
|
||||
"0.0.0.0");
|
||||
conf.setLong(YarnConfiguration.CLIENT_FAILOVER_MAX_ATTEMPTS, 2);
|
||||
conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS, 2);
|
||||
conf.setLong(
|
||||
YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS, 2);
|
||||
|
||||
// Replace the RPC implementation with one that will capture the current UGI
|
||||
conf.setClass(YarnConfiguration.IPC_RPC_IMPL,
|
||||
UGICapturingHadoopYarnProtoRPC.class, YarnRPC.class);
|
||||
|
||||
UserGroupInformation realUser = UserGroupInformation.getCurrentUser();
|
||||
UserGroupInformation proxyUser =
|
||||
UserGroupInformation.createProxyUserForTesting("proxy", realUser,
|
||||
new String[] {"group1"});
|
||||
|
||||
// Create the RMProxy using the proxyUser
|
||||
ApplicationClientProtocol rmProxy = proxyUser.doAs(
|
||||
new PrivilegedExceptionAction<ApplicationClientProtocol>() {
|
||||
@Override
|
||||
public ApplicationClientProtocol run() throws Exception {
|
||||
return ClientRMProxy.createRMProxy(conf,
|
||||
ApplicationClientProtocol.class);
|
||||
}
|
||||
});
|
||||
|
||||
// It was in a doAs, so the UGI should be correct
|
||||
assertUGI();
|
||||
|
||||
// Try to use the RMProxy, which should trigger the RPC again
|
||||
GetNewApplicationRequest request =
|
||||
Records.newRecord(GetNewApplicationRequest.class);
|
||||
UGICapturingHadoopYarnProtoRPC.lastCurrentUser = null;
|
||||
try {
|
||||
rmProxy.getNewApplication(request);
|
||||
} catch (IOException ioe) {
|
||||
// ignore - RMs are not running so this is expected to fail
|
||||
}
|
||||
|
||||
// This time it was outside a doAs, but make sure the UGI was still correct
|
||||
assertUGI();
|
||||
}
|
||||
|
||||
private void assertUGI() throws IOException {
|
||||
UserGroupInformation lastCurrentUser =
|
||||
UGICapturingHadoopYarnProtoRPC.lastCurrentUser;
|
||||
assertNotNull(lastCurrentUser);
|
||||
assertEquals("proxy", lastCurrentUser.getShortUserName());
|
||||
Assert.assertEquals(UserGroupInformation.AuthenticationMethod.PROXY,
|
||||
lastCurrentUser.getAuthenticationMethod());
|
||||
assertEquals(UserGroupInformation.getCurrentUser(),
|
||||
lastCurrentUser.getRealUser());
|
||||
// Reset UGICapturingHadoopYarnProtoRPC
|
||||
UGICapturingHadoopYarnProtoRPC.lastCurrentUser = null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Subclass of {@link HadoopYarnProtoRPC} which captures the current UGI in
|
||||
* a static variable. Used by {@link #testProxyUserCorrectUGI()}.
|
||||
*/
|
||||
public static class UGICapturingHadoopYarnProtoRPC
|
||||
extends HadoopYarnProtoRPC {
|
||||
|
||||
static UserGroupInformation lastCurrentUser = null;
|
||||
|
||||
@Override
|
||||
public Object getProxy(Class protocol, InetSocketAddress addr,
|
||||
Configuration conf) {
|
||||
UserGroupInformation currentUser = null;
|
||||
try {
|
||||
currentUser = UserGroupInformation.getCurrentUser();
|
||||
} catch (IOException ioe) {
|
||||
Assert.fail("Unable to get current user\n"
|
||||
+ StringUtils.stringifyException(ioe));
|
||||
}
|
||||
lastCurrentUser = currentUser;
|
||||
|
||||
return super.getProxy(protocol, addr, conf);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,7 +32,6 @@ import com.google.common.base.Preconditions;
|
|||
|
||||
public class ServerRMProxy<T> extends RMProxy<T> {
|
||||
private static final Log LOG = LogFactory.getLog(ServerRMProxy.class);
|
||||
private static final ServerRMProxy INSTANCE = new ServerRMProxy();
|
||||
|
||||
private ServerRMProxy() {
|
||||
super();
|
||||
|
@ -65,7 +64,8 @@ public class ServerRMProxy<T> extends RMProxy<T> {
|
|||
configuration.getLong(
|
||||
YarnConfiguration.NM_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
|
||||
rmRetryInterval);
|
||||
return createRMProxy(configuration, protocol, INSTANCE,
|
||||
ServerRMProxy<T> serverRMProxy = new ServerRMProxy<>();
|
||||
return createRMProxy(configuration, protocol, serverRMProxy,
|
||||
nmRmConnectWait, nmRmRetryInterval);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue