diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
index 96f925f0f2c..49432aff117 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
@@ -807,17 +807,18 @@ public class Client implements AutoCloseable {
*/
private synchronized void setupIOstreams(
AtomicBoolean fallbackToSimpleAuth) {
- if (socket != null || shouldCloseConnection.get()) {
- return;
- }
- UserGroupInformation ticket = remoteId.getTicket();
- if (ticket != null) {
- final UserGroupInformation realUser = ticket.getRealUser();
- if (realUser != null) {
- ticket = realUser;
- }
- }
try {
+ if (socket != null || shouldCloseConnection.get()) {
+ setFallBackToSimpleAuth(fallbackToSimpleAuth);
+ return;
+ }
+ UserGroupInformation ticket = remoteId.getTicket();
+ if (ticket != null) {
+ final UserGroupInformation realUser = ticket.getRealUser();
+ if (realUser != null) {
+ ticket = realUser;
+ }
+ }
connectingThread.set(Thread.currentThread());
if (LOG.isDebugEnabled()) {
LOG.debug("Connecting to "+server);
@@ -863,20 +864,8 @@ public class Client implements AutoCloseable {
remoteId.saslQop =
(String)saslRpcClient.getNegotiatedProperty(Sasl.QOP);
LOG.debug("Negotiated QOP is :" + remoteId.saslQop);
- if (fallbackToSimpleAuth != null) {
- fallbackToSimpleAuth.set(false);
- }
- } else if (UserGroupInformation.isSecurityEnabled()) {
- if (!fallbackAllowed) {
- throw new AccessControlException(
- "Server asks us to fall back to SIMPLE " +
- "auth, but this client is configured to only allow secure " +
- "connections.");
- }
- if (fallbackToSimpleAuth != null) {
- fallbackToSimpleAuth.set(true);
- }
}
+ setFallBackToSimpleAuth(fallbackToSimpleAuth);
}
if (doPing) {
@@ -909,7 +898,41 @@ public class Client implements AutoCloseable {
connectingThread.set(null);
}
}
-
+
+ private void setFallBackToSimpleAuth(AtomicBoolean fallbackToSimpleAuth)
+ throws AccessControlException {
+ if (authMethod == null || authProtocol != AuthProtocol.SASL) {
+ if (authProtocol == AuthProtocol.SASL) {
+ LOG.trace("Auth method is not set, yield from setting auth fallback.");
+ }
+ return;
+ }
+ if (fallbackToSimpleAuth == null) {
+ // this should happen only during testing.
+ LOG.trace("Connection {} will skip to set fallbackToSimpleAuth as it is null.", remoteId);
+ } else {
+ if (fallbackToSimpleAuth.get()) {
+ // we already set the value to true, we do not need to examine again.
+ return;
+ }
+ }
+ if (authMethod != AuthMethod.SIMPLE) {
+ if (fallbackToSimpleAuth != null) {
+ LOG.trace("Disabling fallbackToSimpleAuth, target does not use SIMPLE authentication.");
+ fallbackToSimpleAuth.set(false);
+ }
+ } else if (UserGroupInformation.isSecurityEnabled()) {
+ if (!fallbackAllowed) {
+ throw new AccessControlException("Server asks us to fall back to SIMPLE auth, but this "
+ + "client is configured to only allow secure connections.");
+ }
+ if (fallbackToSimpleAuth != null) {
+ LOG.trace("Enabling fallbackToSimpleAuth for target, as we are allowed to fall back.");
+ fallbackToSimpleAuth.set(true);
+ }
+ }
+ }
+
private void closeConnection() {
if (socket == null) {
return;
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java
index 0962b50099c..e9019e3d24e 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.thirdparty.protobuf.BlockingService;
import org.apache.hadoop.thirdparty.protobuf.RpcController;
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
@@ -124,18 +125,19 @@ public class TestRpcBase {
return server;
}
- protected static TestRpcService getClient(InetSocketAddress serverAddr,
- Configuration clientConf)
+ protected static TestRpcService getClient(InetSocketAddress serverAddr, Configuration clientConf)
throws ServiceException {
- try {
- return RPC.getProxy(TestRpcService.class, 0, serverAddr, clientConf);
- } catch (IOException e) {
- throw new ServiceException(e);
- }
+ return getClient(serverAddr, clientConf, null);
}
protected static TestRpcService getClient(InetSocketAddress serverAddr,
- Configuration clientConf, final RetryPolicy connectionRetryPolicy)
+ Configuration clientConf, RetryPolicy connectionRetryPolicy) throws ServiceException {
+ return getClient(serverAddr, clientConf, connectionRetryPolicy, null);
+ }
+
+ protected static TestRpcService getClient(InetSocketAddress serverAddr,
+ Configuration clientConf, final RetryPolicy connectionRetryPolicy,
+ AtomicBoolean fallbackToSimpleAuth)
throws ServiceException {
try {
return RPC.getProtocolProxy(
@@ -146,7 +148,7 @@ public class TestRpcBase {
clientConf,
NetUtils.getDefaultSocketFactory(clientConf),
RPC.getRpcTimeout(clientConf),
- connectionRetryPolicy, null).getProxy();
+ connectionRetryPolicy, fallbackToSimpleAuth).getProxy();
} catch (IOException e) {
throw new ServiceException(e);
}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java
index 72085a19ec7..662faea5996 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java
@@ -72,6 +72,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
@@ -569,6 +570,72 @@ public class TestSaslRPC extends TestRpcBase {
assertAuthEquals(SIMPLE, getAuthMethod(KERBEROS, SIMPLE, UseToken.OTHER));
}
+ /**
+ * In DfsClient there is a fallback mechanism to simple auth, which passes in an atomic boolean
+ * to the ipc Client, which then sets it during setupIOStreams.
+ * SetupIOStreams were running only once per connection, so if two separate DfsClient was
+ * instantiated, then due to the connection caching inside the ipc client, the second DfsClient
+ * did not have the passed in atomic boolean set properly if the first client was not yet closed,
+ * as setupIOStreams was yielding to set up new streams as it has reused the already existing
+ * connection.
+ * This test mimics this behaviour, and asserts the fallback whether it is set correctly.
+ * @see HADOOP-17975
+ */
+ @Test
+ public void testClientFallbackToSimpleAuthForASecondClient() throws Exception {
+ Configuration serverConf = createConfForAuth(SIMPLE);
+ Server server = startServer(serverConf,
+ setupServerUgi(SIMPLE, serverConf),
+ createServerSecretManager(SIMPLE, new TestTokenSecretManager()));
+ final InetSocketAddress serverAddress = NetUtils.getConnectAddress(server);
+
+ clientFallBackToSimpleAllowed = true;
+ Configuration clientConf = createConfForAuth(KERBEROS);
+ UserGroupInformation clientUgi = setupClientUgi(KERBEROS, clientConf);
+
+ AtomicBoolean fallbackToSimpleAuth1 = new AtomicBoolean();
+ AtomicBoolean fallbackToSimpleAuth2 = new AtomicBoolean();
+ try {
+ LOG.info("trying ugi:"+ clientUgi +" tokens:"+ clientUgi.getTokens());
+ clientUgi.doAs((PrivilegedExceptionAction) () -> {
+ TestRpcService proxy1 = null;
+ TestRpcService proxy2 = null;
+ try {
+ proxy1 = getClient(serverAddress, clientConf, null, fallbackToSimpleAuth1);
+ proxy1.ping(null, newEmptyRequest());
+ // make sure the other side thinks we are who we said we are!!!
+ assertEquals(clientUgi.getUserName(),
+ proxy1.getAuthUser(null, newEmptyRequest()).getUser());
+ AuthMethod authMethod =
+ convert(proxy1.getAuthMethod(null, newEmptyRequest()));
+ assertAuthEquals(SIMPLE, authMethod.toString());
+
+ proxy2 = getClient(serverAddress, clientConf, null, fallbackToSimpleAuth2);
+ proxy2.ping(null, newEmptyRequest());
+ // make sure the other side thinks we are who we said we are!!!
+ assertEquals(clientUgi.getUserName(),
+ proxy2.getAuthUser(null, newEmptyRequest()).getUser());
+ AuthMethod authMethod2 =
+ convert(proxy2.getAuthMethod(null, newEmptyRequest()));
+ assertAuthEquals(SIMPLE, authMethod2.toString());
+ } finally {
+ if (proxy1 != null) {
+ RPC.stopProxy(proxy1);
+ }
+ if (proxy2 != null) {
+ RPC.stopProxy(proxy2);
+ }
+ }
+ return null;
+ });
+ } finally {
+ server.stop();
+ }
+
+ assertTrue("First client does not set to fall back properly.", fallbackToSimpleAuth1.get());
+ assertTrue("Second client does not set to fall back properly.", fallbackToSimpleAuth2.get());
+ }
+
@Test
public void testNoClientFallbackToSimple()
throws Exception {
@@ -815,22 +882,44 @@ public class TestSaslRPC extends TestRpcBase {
return e.toString();
}
}
-
+
private String internalGetAuthMethod(
final AuthMethod clientAuth,
final AuthMethod serverAuth,
final UseToken tokenType) throws Exception {
-
- final Configuration serverConf = new Configuration(conf);
- serverConf.set(HADOOP_SECURITY_AUTHENTICATION, serverAuth.toString());
- UserGroupInformation.setConfiguration(serverConf);
-
- final UserGroupInformation serverUgi = (serverAuth == KERBEROS)
- ? UserGroupInformation.createRemoteUser("server/localhost@NONE")
- : UserGroupInformation.createRemoteUser("server");
- serverUgi.setAuthenticationMethod(serverAuth);
final TestTokenSecretManager sm = new TestTokenSecretManager();
+
+ Configuration serverConf = createConfForAuth(serverAuth);
+ Server server = startServer(
+ serverConf,
+ setupServerUgi(serverAuth, serverConf),
+ createServerSecretManager(serverAuth, sm));
+ final InetSocketAddress serverAddress = NetUtils.getConnectAddress(server);
+
+ final Configuration clientConf = createConfForAuth(clientAuth);
+ final UserGroupInformation clientUgi = setupClientUgi(clientAuth, clientConf);
+
+ setupTokenIfNeeded(tokenType, sm, clientUgi, serverAddress);
+
+ try {
+ return createClientAndQueryAuthMethod(serverAddress, clientConf, clientUgi, null);
+ } finally {
+ server.stop();
+ }
+ }
+
+ private Configuration createConfForAuth(AuthMethod clientAuth) {
+ final Configuration clientConf = new Configuration(conf);
+ clientConf.set(HADOOP_SECURITY_AUTHENTICATION, clientAuth.toString());
+ clientConf.setBoolean(
+ CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
+ clientFallBackToSimpleAllowed);
+ return clientConf;
+ }
+
+ private SecretManager> createServerSecretManager(
+ AuthMethod serverAuth, TestTokenSecretManager sm) {
boolean useSecretManager = (serverAuth != SIMPLE);
if (enableSecretManager != null) {
useSecretManager &= enableSecretManager;
@@ -839,26 +928,43 @@ public class TestSaslRPC extends TestRpcBase {
useSecretManager |= forceSecretManager;
}
final SecretManager> serverSm = useSecretManager ? sm : null;
+ return serverSm;
+ }
+ private Server startServer(Configuration serverConf, UserGroupInformation serverUgi,
+ SecretManager> serverSm) throws IOException, InterruptedException {
Server server = serverUgi.doAs(new PrivilegedExceptionAction() {
@Override
public Server run() throws IOException {
return setupTestServer(serverConf, 5, serverSm);
}
});
+ return server;
+ }
- final Configuration clientConf = new Configuration(conf);
- clientConf.set(HADOOP_SECURITY_AUTHENTICATION, clientAuth.toString());
- clientConf.setBoolean(
- CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
- clientFallBackToSimpleAllowed);
+ private UserGroupInformation setupServerUgi(AuthMethod serverAuth,
+ Configuration serverConf) {
+ UserGroupInformation.setConfiguration(serverConf);
+
+ final UserGroupInformation serverUgi = (serverAuth == KERBEROS)
+ ? UserGroupInformation.createRemoteUser("server/localhost@NONE")
+ : UserGroupInformation.createRemoteUser("server");
+ serverUgi.setAuthenticationMethod(serverAuth);
+ return serverUgi;
+ }
+
+ private UserGroupInformation setupClientUgi(AuthMethod clientAuth,
+ Configuration clientConf) {
UserGroupInformation.setConfiguration(clientConf);
-
+
final UserGroupInformation clientUgi =
UserGroupInformation.createRemoteUser("client");
- clientUgi.setAuthenticationMethod(clientAuth);
+ clientUgi.setAuthenticationMethod(clientAuth);
+ return clientUgi;
+ }
- final InetSocketAddress addr = NetUtils.getConnectAddress(server);
+ private void setupTokenIfNeeded(UseToken tokenType, TestTokenSecretManager sm,
+ UserGroupInformation clientUgi, InetSocketAddress addr) {
if (tokenType != UseToken.NONE) {
TestTokenIdentifier tokenId = new TestTokenIdentifier(
new Text(clientUgi.getUserName()));
@@ -881,44 +987,44 @@ public class TestSaslRPC extends TestRpcBase {
}
clientUgi.addToken(token);
}
+ }
- try {
- LOG.info("trying ugi:"+clientUgi+" tokens:"+clientUgi.getTokens());
- return clientUgi.doAs(new PrivilegedExceptionAction() {
- @Override
- public String run() throws IOException {
- TestRpcService proxy = null;
- try {
- proxy = getClient(addr, clientConf);
+ private String createClientAndQueryAuthMethod(InetSocketAddress serverAddress,
+ Configuration clientConf, UserGroupInformation clientUgi, AtomicBoolean fallbackToSimpleAuth)
+ throws IOException, InterruptedException {
+ LOG.info("trying ugi:"+ clientUgi +" tokens:"+ clientUgi.getTokens());
+ return clientUgi.doAs(new PrivilegedExceptionAction() {
+ @Override
+ public String run() throws IOException {
+ TestRpcService proxy = null;
+ try {
+ proxy = getClient(serverAddress, clientConf, null, fallbackToSimpleAuth);
- proxy.ping(null, newEmptyRequest());
- // make sure the other side thinks we are who we said we are!!!
- assertEquals(clientUgi.getUserName(),
- proxy.getAuthUser(null, newEmptyRequest()).getUser());
- AuthMethod authMethod =
- convert(proxy.getAuthMethod(null, newEmptyRequest()));
- // verify sasl completed with correct QOP
- assertEquals((authMethod != SIMPLE) ? expectedQop.saslQop : null,
- RPC.getConnectionIdForProxy(proxy).getSaslQop());
- return authMethod != null ? authMethod.toString() : null;
- } catch (ServiceException se) {
- if (se.getCause() instanceof RemoteException) {
- throw (RemoteException) se.getCause();
- } else if (se.getCause() instanceof IOException) {
- throw (IOException) se.getCause();
- } else {
- throw new RuntimeException(se.getCause());
- }
- } finally {
- if (proxy != null) {
- RPC.stopProxy(proxy);
- }
+ proxy.ping(null, newEmptyRequest());
+ // make sure the other side thinks we are who we said we are!!!
+ assertEquals(clientUgi.getUserName(),
+ proxy.getAuthUser(null, newEmptyRequest()).getUser());
+ AuthMethod authMethod =
+ convert(proxy.getAuthMethod(null, newEmptyRequest()));
+ // verify sasl completed with correct QOP
+ assertEquals((authMethod != SIMPLE) ? expectedQop.saslQop : null,
+ RPC.getConnectionIdForProxy(proxy).getSaslQop());
+ return authMethod != null ? authMethod.toString() : null;
+ } catch (ServiceException se) {
+ if (se.getCause() instanceof RemoteException) {
+ throw (RemoteException) se.getCause();
+ } else if (se.getCause() instanceof IOException) {
+ throw (IOException) se.getCause();
+ } else {
+ throw new RuntimeException(se.getCause());
+ }
+ } finally {
+ if (proxy != null) {
+ RPC.stopProxy(proxy);
}
}
- });
- } finally {
- server.stop();
- }
+ }
+ });
}
private static void assertAuthEquals(AuthMethod expect,