HADOOP-17975. Fallback to simple auth does not work for a secondary DistributedFileSystem instance. (#3579)

This commit is contained in:
Istvan Fajth 2021-11-24 11:44:57 +01:00 committed by GitHub
parent 08f3df3ea2
commit ae3ba45db5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 216 additions and 85 deletions

View File

@ -807,7 +807,9 @@ public class Client implements AutoCloseable {
*/
private synchronized void setupIOstreams(
AtomicBoolean fallbackToSimpleAuth) {
try {
if (socket != null || shouldCloseConnection.get()) {
setFallBackToSimpleAuth(fallbackToSimpleAuth);
return;
}
UserGroupInformation ticket = remoteId.getTicket();
@ -817,7 +819,6 @@ public class Client implements AutoCloseable {
ticket = realUser;
}
}
try {
connectingThread.set(Thread.currentThread());
if (LOG.isDebugEnabled()) {
LOG.debug("Connecting to "+server);
@ -863,20 +864,8 @@ public class Client implements AutoCloseable {
remoteId.saslQop =
(String)saslRpcClient.getNegotiatedProperty(Sasl.QOP);
LOG.debug("Negotiated QOP is :" + remoteId.saslQop);
if (fallbackToSimpleAuth != null) {
fallbackToSimpleAuth.set(false);
}
} else if (UserGroupInformation.isSecurityEnabled()) {
if (!fallbackAllowed) {
throw new AccessControlException(
"Server asks us to fall back to SIMPLE " +
"auth, but this client is configured to only allow secure " +
"connections.");
}
if (fallbackToSimpleAuth != null) {
fallbackToSimpleAuth.set(true);
}
}
setFallBackToSimpleAuth(fallbackToSimpleAuth);
}
if (doPing) {
@ -910,6 +899,40 @@ public class Client implements AutoCloseable {
}
}
private void setFallBackToSimpleAuth(AtomicBoolean fallbackToSimpleAuth)
throws AccessControlException {
if (authMethod == null || authProtocol != AuthProtocol.SASL) {
if (authProtocol == AuthProtocol.SASL) {
LOG.trace("Auth method is not set, yield from setting auth fallback.");
}
return;
}
if (fallbackToSimpleAuth == null) {
// this should happen only during testing.
LOG.trace("Connection {} will skip to set fallbackToSimpleAuth as it is null.", remoteId);
} else {
if (fallbackToSimpleAuth.get()) {
// we already set the value to true, we do not need to examine again.
return;
}
}
if (authMethod != AuthMethod.SIMPLE) {
if (fallbackToSimpleAuth != null) {
LOG.trace("Disabling fallbackToSimpleAuth, target does not use SIMPLE authentication.");
fallbackToSimpleAuth.set(false);
}
} else if (UserGroupInformation.isSecurityEnabled()) {
if (!fallbackAllowed) {
throw new AccessControlException("Server asks us to fall back to SIMPLE auth, but this "
+ "client is configured to only allow secure connections.");
}
if (fallbackToSimpleAuth != null) {
LOG.trace("Enabling fallbackToSimpleAuth for target, as we are allowed to fall back.");
fallbackToSimpleAuth.set(true);
}
}
}
private void closeConnection() {
if (socket == null) {
return;

View File

@ -22,6 +22,7 @@ import org.apache.hadoop.thirdparty.protobuf.BlockingService;
import org.apache.hadoop.thirdparty.protobuf.RpcController;
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
@ -124,18 +125,19 @@ public class TestRpcBase {
return server;
}
protected static TestRpcService getClient(InetSocketAddress serverAddr,
Configuration clientConf)
protected static TestRpcService getClient(InetSocketAddress serverAddr, Configuration clientConf)
throws ServiceException {
try {
return RPC.getProxy(TestRpcService.class, 0, serverAddr, clientConf);
} catch (IOException e) {
throw new ServiceException(e);
}
return getClient(serverAddr, clientConf, null);
}
protected static TestRpcService getClient(InetSocketAddress serverAddr,
Configuration clientConf, final RetryPolicy connectionRetryPolicy)
Configuration clientConf, RetryPolicy connectionRetryPolicy) throws ServiceException {
return getClient(serverAddr, clientConf, connectionRetryPolicy, null);
}
protected static TestRpcService getClient(InetSocketAddress serverAddr,
Configuration clientConf, final RetryPolicy connectionRetryPolicy,
AtomicBoolean fallbackToSimpleAuth)
throws ServiceException {
try {
return RPC.getProtocolProxy(
@ -146,7 +148,7 @@ public class TestRpcBase {
clientConf,
NetUtils.getDefaultSocketFactory(clientConf),
RPC.getRpcTimeout(clientConf),
connectionRetryPolicy, null).getProxy();
connectionRetryPolicy, fallbackToSimpleAuth).getProxy();
} catch (IOException e) {
throw new ServiceException(e);
}

View File

@ -72,6 +72,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
@ -569,6 +570,72 @@ public class TestSaslRPC extends TestRpcBase {
assertAuthEquals(SIMPLE, getAuthMethod(KERBEROS, SIMPLE, UseToken.OTHER));
}
/**
* In DfsClient there is a fallback mechanism to simple auth, which passes in an atomic boolean
* to the ipc Client, which then sets it during setupIOStreams.
* SetupIOStreams were running only once per connection, so if two separate DfsClient was
* instantiated, then due to the connection caching inside the ipc client, the second DfsClient
* did not have the passed in atomic boolean set properly if the first client was not yet closed,
* as setupIOStreams was yielding to set up new streams as it has reused the already existing
* connection.
* This test mimics this behaviour, and asserts the fallback whether it is set correctly.
* @see <a href="https://issues.apache.org/jira/browse/HADOOP-17975">HADOOP-17975</a>
*/
@Test
public void testClientFallbackToSimpleAuthForASecondClient() throws Exception {
Configuration serverConf = createConfForAuth(SIMPLE);
Server server = startServer(serverConf,
setupServerUgi(SIMPLE, serverConf),
createServerSecretManager(SIMPLE, new TestTokenSecretManager()));
final InetSocketAddress serverAddress = NetUtils.getConnectAddress(server);
clientFallBackToSimpleAllowed = true;
Configuration clientConf = createConfForAuth(KERBEROS);
UserGroupInformation clientUgi = setupClientUgi(KERBEROS, clientConf);
AtomicBoolean fallbackToSimpleAuth1 = new AtomicBoolean();
AtomicBoolean fallbackToSimpleAuth2 = new AtomicBoolean();
try {
LOG.info("trying ugi:"+ clientUgi +" tokens:"+ clientUgi.getTokens());
clientUgi.doAs((PrivilegedExceptionAction<Void>) () -> {
TestRpcService proxy1 = null;
TestRpcService proxy2 = null;
try {
proxy1 = getClient(serverAddress, clientConf, null, fallbackToSimpleAuth1);
proxy1.ping(null, newEmptyRequest());
// make sure the other side thinks we are who we said we are!!!
assertEquals(clientUgi.getUserName(),
proxy1.getAuthUser(null, newEmptyRequest()).getUser());
AuthMethod authMethod =
convert(proxy1.getAuthMethod(null, newEmptyRequest()));
assertAuthEquals(SIMPLE, authMethod.toString());
proxy2 = getClient(serverAddress, clientConf, null, fallbackToSimpleAuth2);
proxy2.ping(null, newEmptyRequest());
// make sure the other side thinks we are who we said we are!!!
assertEquals(clientUgi.getUserName(),
proxy2.getAuthUser(null, newEmptyRequest()).getUser());
AuthMethod authMethod2 =
convert(proxy2.getAuthMethod(null, newEmptyRequest()));
assertAuthEquals(SIMPLE, authMethod2.toString());
} finally {
if (proxy1 != null) {
RPC.stopProxy(proxy1);
}
if (proxy2 != null) {
RPC.stopProxy(proxy2);
}
}
return null;
});
} finally {
server.stop();
}
assertTrue("First client does not set to fall back properly.", fallbackToSimpleAuth1.get());
assertTrue("Second client does not set to fall back properly.", fallbackToSimpleAuth2.get());
}
@Test
public void testNoClientFallbackToSimple()
throws Exception {
@ -821,16 +888,38 @@ public class TestSaslRPC extends TestRpcBase {
final AuthMethod serverAuth,
final UseToken tokenType) throws Exception {
final Configuration serverConf = new Configuration(conf);
serverConf.set(HADOOP_SECURITY_AUTHENTICATION, serverAuth.toString());
UserGroupInformation.setConfiguration(serverConf);
final UserGroupInformation serverUgi = (serverAuth == KERBEROS)
? UserGroupInformation.createRemoteUser("server/localhost@NONE")
: UserGroupInformation.createRemoteUser("server");
serverUgi.setAuthenticationMethod(serverAuth);
final TestTokenSecretManager sm = new TestTokenSecretManager();
Configuration serverConf = createConfForAuth(serverAuth);
Server server = startServer(
serverConf,
setupServerUgi(serverAuth, serverConf),
createServerSecretManager(serverAuth, sm));
final InetSocketAddress serverAddress = NetUtils.getConnectAddress(server);
final Configuration clientConf = createConfForAuth(clientAuth);
final UserGroupInformation clientUgi = setupClientUgi(clientAuth, clientConf);
setupTokenIfNeeded(tokenType, sm, clientUgi, serverAddress);
try {
return createClientAndQueryAuthMethod(serverAddress, clientConf, clientUgi, null);
} finally {
server.stop();
}
}
private Configuration createConfForAuth(AuthMethod clientAuth) {
final Configuration clientConf = new Configuration(conf);
clientConf.set(HADOOP_SECURITY_AUTHENTICATION, clientAuth.toString());
clientConf.setBoolean(
CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
clientFallBackToSimpleAllowed);
return clientConf;
}
private SecretManager<?> createServerSecretManager(
AuthMethod serverAuth, TestTokenSecretManager sm) {
boolean useSecretManager = (serverAuth != SIMPLE);
if (enableSecretManager != null) {
useSecretManager &= enableSecretManager;
@ -839,26 +928,43 @@ public class TestSaslRPC extends TestRpcBase {
useSecretManager |= forceSecretManager;
}
final SecretManager<?> serverSm = useSecretManager ? sm : null;
return serverSm;
}
private Server startServer(Configuration serverConf, UserGroupInformation serverUgi,
SecretManager<?> serverSm) throws IOException, InterruptedException {
Server server = serverUgi.doAs(new PrivilegedExceptionAction<Server>() {
@Override
public Server run() throws IOException {
return setupTestServer(serverConf, 5, serverSm);
}
});
return server;
}
final Configuration clientConf = new Configuration(conf);
clientConf.set(HADOOP_SECURITY_AUTHENTICATION, clientAuth.toString());
clientConf.setBoolean(
CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
clientFallBackToSimpleAllowed);
private UserGroupInformation setupServerUgi(AuthMethod serverAuth,
Configuration serverConf) {
UserGroupInformation.setConfiguration(serverConf);
final UserGroupInformation serverUgi = (serverAuth == KERBEROS)
? UserGroupInformation.createRemoteUser("server/localhost@NONE")
: UserGroupInformation.createRemoteUser("server");
serverUgi.setAuthenticationMethod(serverAuth);
return serverUgi;
}
private UserGroupInformation setupClientUgi(AuthMethod clientAuth,
Configuration clientConf) {
UserGroupInformation.setConfiguration(clientConf);
final UserGroupInformation clientUgi =
UserGroupInformation.createRemoteUser("client");
clientUgi.setAuthenticationMethod(clientAuth);
return clientUgi;
}
final InetSocketAddress addr = NetUtils.getConnectAddress(server);
private void setupTokenIfNeeded(UseToken tokenType, TestTokenSecretManager sm,
UserGroupInformation clientUgi, InetSocketAddress addr) {
if (tokenType != UseToken.NONE) {
TestTokenIdentifier tokenId = new TestTokenIdentifier(
new Text(clientUgi.getUserName()));
@ -881,15 +987,18 @@ public class TestSaslRPC extends TestRpcBase {
}
clientUgi.addToken(token);
}
}
try {
LOG.info("trying ugi:"+clientUgi+" tokens:"+clientUgi.getTokens());
private String createClientAndQueryAuthMethod(InetSocketAddress serverAddress,
Configuration clientConf, UserGroupInformation clientUgi, AtomicBoolean fallbackToSimpleAuth)
throws IOException, InterruptedException {
LOG.info("trying ugi:"+ clientUgi +" tokens:"+ clientUgi.getTokens());
return clientUgi.doAs(new PrivilegedExceptionAction<String>() {
@Override
public String run() throws IOException {
TestRpcService proxy = null;
try {
proxy = getClient(addr, clientConf);
proxy = getClient(serverAddress, clientConf, null, fallbackToSimpleAuth);
proxy.ping(null, newEmptyRequest());
// make sure the other side thinks we are who we said we are!!!
@ -916,9 +1025,6 @@ public class TestSaslRPC extends TestRpcBase {
}
}
});
} finally {
server.stop();
}
}
private static void assertAuthEquals(AuthMethod expect,