diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index 2f482c290ed..84fe5523ec8 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -687,7 +687,8 @@ public class Client { * a header to the server and starts * the connection thread that waits for responses. */ - private synchronized void setupIOstreams() { + private synchronized void setupIOstreams( + AtomicBoolean fallbackToSimpleAuth) { if (socket != null || shouldCloseConnection.get()) { return; } @@ -738,11 +739,18 @@ public class Client { remoteId.saslQop = (String)saslRpcClient.getNegotiatedProperty(Sasl.QOP); LOG.debug("Negotiated QOP is :" + remoteId.saslQop); - } else if (UserGroupInformation.isSecurityEnabled() && - !fallbackAllowed) { - throw new IOException("Server asks us to fall back to SIMPLE " + - "auth, but this client is configured to only allow secure " + - "connections."); + if (fallbackToSimpleAuth != null) { + fallbackToSimpleAuth.set(false); + } + } else if (UserGroupInformation.isSecurityEnabled()) { + if (!fallbackAllowed) { + throw new IOException("Server asks us to fall back to SIMPLE " + + "auth, but this client is configured to only allow secure " + + "connections."); + } + if (fallbackToSimpleAuth != null) { + fallbackToSimpleAuth.set(true); + } } } @@ -1375,6 +1383,26 @@ public class Client { /** * Make a call, passing rpcRequest, to the IPC server defined by * remoteId, returning the rpc respond. + * + * @param rpcKind + * @param rpcRequest - contains serialized method and method parameters + * @param remoteId - the target rpc server + * @param fallbackToSimpleAuth - set to true or false during this method to + * indicate if a secure client falls back to simple auth + * @returns the rpc response + * Throws exceptions if there are network problems or if the remote code + * threw an exception. + */ + public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest, + ConnectionId remoteId, AtomicBoolean fallbackToSimpleAuth) + throws IOException { + return call(rpcKind, rpcRequest, remoteId, RPC.RPC_SERVICE_CLASS_DEFAULT, + fallbackToSimpleAuth); + } + + /** + * Make a call, passing rpcRequest, to the IPC server defined by + * remoteId, returning the rpc response. * * @param rpcKind * @param rpcRequest - contains serialized method and method parameters @@ -1386,8 +1414,29 @@ public class Client { */ public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest, ConnectionId remoteId, int serviceClass) throws IOException { + return call(rpcKind, rpcRequest, remoteId, serviceClass, null); + } + + /** + * Make a call, passing rpcRequest, to the IPC server defined by + * remoteId, returning the rpc response. + * + * @param rpcKind + * @param rpcRequest - contains serialized method and method parameters + * @param remoteId - the target rpc server + * @param serviceClass - service class for RPC + * @param fallbackToSimpleAuth - set to true or false during this method to + * indicate if a secure client falls back to simple auth + * @returns the rpc response + * Throws exceptions if there are network problems or if the remote code + * threw an exception. + */ + public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest, + ConnectionId remoteId, int serviceClass, + AtomicBoolean fallbackToSimpleAuth) throws IOException { final Call call = createCall(rpcKind, rpcRequest); - Connection connection = getConnection(remoteId, call, serviceClass); + Connection connection = getConnection(remoteId, call, serviceClass, + fallbackToSimpleAuth); try { connection.sendRpcRequest(call); // send the rpc request } catch (RejectedExecutionException e) { @@ -1444,7 +1493,8 @@ public class Client { /** Get a connection from the pool, or create a new one and add it to the * pool. Connections to a given ConnectionId are reused. */ private Connection getConnection(ConnectionId remoteId, - Call call, int serviceClass) throws IOException { + Call call, int serviceClass, AtomicBoolean fallbackToSimpleAuth) + throws IOException { if (!running.get()) { // the client is stopped throw new IOException("The client is stopped"); @@ -1468,7 +1518,7 @@ public class Client { //block above. The reason for that is if the server happens to be slow, //it will take longer to establish a connection and that will slow the //entire system down. - connection.setupIOstreams(); + connection.setupIOstreams(fallbackToSimpleAuth); return connection; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java index 0ccdb71d0ee..124d835ab15 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java @@ -27,6 +27,7 @@ import java.lang.reflect.Proxy; import java.net.InetSocketAddress; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; import javax.net.SocketFactory; @@ -84,14 +85,23 @@ public class ProtobufRpcEngine implements RpcEngine { } @Override - @SuppressWarnings("unchecked") public ProtocolProxy getProxy(Class protocol, long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy ) throws IOException { + return getProxy(protocol, clientVersion, addr, ticket, conf, factory, + rpcTimeout, connectionRetryPolicy, null); + } + + @Override + @SuppressWarnings("unchecked") + public ProtocolProxy getProxy(Class protocol, long clientVersion, + InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, + SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy, + AtomicBoolean fallbackToSimpleAuth) throws IOException { final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory, - rpcTimeout, connectionRetryPolicy); + rpcTimeout, connectionRetryPolicy, fallbackToSimpleAuth); return new ProtocolProxy(protocol, (T) Proxy.newProxyInstance( protocol.getClassLoader(), new Class[]{protocol}, invoker), false); } @@ -115,13 +125,16 @@ public class ProtobufRpcEngine implements RpcEngine { private final Client client; private final long clientProtocolVersion; private final String protocolName; + private AtomicBoolean fallbackToSimpleAuth; private Invoker(Class protocol, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, SocketFactory factory, - int rpcTimeout, RetryPolicy connectionRetryPolicy) throws IOException { + int rpcTimeout, RetryPolicy connectionRetryPolicy, + AtomicBoolean fallbackToSimpleAuth) throws IOException { this(protocol, Client.ConnectionId.getConnectionId( addr, protocol, ticket, rpcTimeout, connectionRetryPolicy, conf), conf, factory); + this.fallbackToSimpleAuth = fallbackToSimpleAuth; } /** @@ -217,7 +230,8 @@ public class ProtobufRpcEngine implements RpcEngine { final RpcResponseWrapper val; try { val = (RpcResponseWrapper) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER, - new RpcRequestWrapper(rpcRequestHeader, theRequest), remoteId); + new RpcRequestWrapper(rpcRequestHeader, theRequest), remoteId, + fallbackToSimpleAuth); } catch (Throwable e) { if (LOG.isTraceEnabled()) { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java index 4ae7956c68e..40f6515e4a0 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java @@ -33,6 +33,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.HashMap; +import java.util.concurrent.atomic.AtomicBoolean; import javax.net.SocketFactory; @@ -524,6 +525,7 @@ public class RPC { * @param conf configuration * @param factory socket factory * @param rpcTimeout max time for each rpc; 0 means no timeout + * @param connectionRetryPolicy retry policy * @return the proxy * @throws IOException if any error occurs */ @@ -535,11 +537,43 @@ public class RPC { SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy) throws IOException { + return getProtocolProxy(protocol, clientVersion, addr, ticket, + conf, factory, rpcTimeout, connectionRetryPolicy, null); + } + + /** + * Get a protocol proxy that contains a proxy connection to a remote server + * and a set of methods that are supported by the server + * + * @param protocol protocol + * @param clientVersion client's version + * @param addr server address + * @param ticket security ticket + * @param conf configuration + * @param factory socket factory + * @param rpcTimeout max time for each rpc; 0 means no timeout + * @param connectionRetryPolicy retry policy + * @param fallbackToSimpleAuth set to true or false during calls to indicate if + * a secure client falls back to simple auth + * @return the proxy + * @throws IOException if any error occurs + */ + public static ProtocolProxy getProtocolProxy(Class protocol, + long clientVersion, + InetSocketAddress addr, + UserGroupInformation ticket, + Configuration conf, + SocketFactory factory, + int rpcTimeout, + RetryPolicy connectionRetryPolicy, + AtomicBoolean fallbackToSimpleAuth) + throws IOException { if (UserGroupInformation.isSecurityEnabled()) { SaslRpcServer.init(conf); } - return getProtocolEngine(protocol,conf).getProxy(protocol, clientVersion, - addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy); + return getProtocolEngine(protocol, conf).getProxy(protocol, clientVersion, + addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy, + fallbackToSimpleAuth); } /** diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java index a8280bd2edf..047722e649e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java @@ -20,6 +20,7 @@ package org.apache.hadoop.ipc; import java.io.IOException; import java.net.InetSocketAddress; +import java.util.concurrent.atomic.AtomicBoolean; import javax.net.SocketFactory; @@ -43,6 +44,14 @@ public interface RpcEngine { SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy) throws IOException; + /** Construct a client-side proxy object. */ + ProtocolProxy getProxy(Class protocol, + long clientVersion, InetSocketAddress addr, + UserGroupInformation ticket, Configuration conf, + SocketFactory factory, int rpcTimeout, + RetryPolicy connectionRetryPolicy, + AtomicBoolean fallbackToSimpleAuth) throws IOException; + /** * Construct a server for a protocol implementation instance. * diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java index 4b2dfe0de10..c2d9435908a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java @@ -24,6 +24,7 @@ import java.lang.reflect.InvocationTargetException; import java.net.InetSocketAddress; import java.io.*; +import java.util.concurrent.atomic.AtomicBoolean; import javax.net.SocketFactory; @@ -212,14 +213,17 @@ public class WritableRpcEngine implements RpcEngine { private Client.ConnectionId remoteId; private Client client; private boolean isClosed = false; + private final AtomicBoolean fallbackToSimpleAuth; public Invoker(Class protocol, InetSocketAddress address, UserGroupInformation ticket, Configuration conf, SocketFactory factory, - int rpcTimeout) throws IOException { + int rpcTimeout, AtomicBoolean fallbackToSimpleAuth) + throws IOException { this.remoteId = Client.ConnectionId.getConnectionId(address, protocol, ticket, rpcTimeout, conf); this.client = CLIENTS.getClient(conf, factory); + this.fallbackToSimpleAuth = fallbackToSimpleAuth; } @Override @@ -238,7 +242,8 @@ public class WritableRpcEngine implements RpcEngine { ObjectWritable value; try { value = (ObjectWritable) - client.call(RPC.RpcKind.RPC_WRITABLE, new Invocation(method, args), remoteId); + client.call(RPC.RpcKind.RPC_WRITABLE, new Invocation(method, args), + remoteId, fallbackToSimpleAuth); } finally { if (traceScope != null) traceScope.close(); } @@ -275,11 +280,25 @@ public class WritableRpcEngine implements RpcEngine { * talking to a server at the named address. * @param */ @Override - @SuppressWarnings("unchecked") public ProtocolProxy getProxy(Class protocol, long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy) + throws IOException { + return getProxy(protocol, clientVersion, addr, ticket, conf, factory, + rpcTimeout, connectionRetryPolicy, null); + } + + /** Construct a client-side proxy object that implements the named protocol, + * talking to a server at the named address. + * @param */ + @Override + @SuppressWarnings("unchecked") + public ProtocolProxy getProxy(Class protocol, long clientVersion, + InetSocketAddress addr, UserGroupInformation ticket, + Configuration conf, SocketFactory factory, + int rpcTimeout, RetryPolicy connectionRetryPolicy, + AtomicBoolean fallbackToSimpleAuth) throws IOException { if (connectionRetryPolicy != null) { @@ -289,7 +308,7 @@ public class WritableRpcEngine implements RpcEngine { T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(), new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf, - factory, rpcTimeout)); + factory, rpcTimeout, fallbackToSimpleAuth)); return new ProtocolProxy(protocol, proxy, true); } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java index f0e389ff5de..c1b1bfb902b 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java @@ -276,12 +276,22 @@ public class TestRPC { */ private static class StoppedRpcEngine implements RpcEngine { - @SuppressWarnings("unchecked") @Override public ProtocolProxy getProxy(Class protocol, long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy ) throws IOException { + return getProxy(protocol, clientVersion, addr, ticket, conf, factory, + rpcTimeout, connectionRetryPolicy, null); + } + + @SuppressWarnings("unchecked") + @Override + public ProtocolProxy getProxy(Class protocol, long clientVersion, + InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, + SocketFactory factory, int rpcTimeout, + RetryPolicy connectionRetryPolicy, AtomicBoolean fallbackToSimpleAuth + ) throws IOException { T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(), new Class[] { protocol }, new StoppedInvocationHandler()); return new ProtocolProxy(protocol, proxy, false); diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index fa9bbcdad27..9babe96f51f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -900,6 +900,9 @@ Release 2.6.0 - UNRELEASED HDFS-7105. Fix TestJournalNode#testFailToStartWithBadConfig to match log output change. (Ray Chiang via cnauroth) + HDFS-7105. Allow falling back to a non-SASL connection on + DataTransferProtocol in several edge cases. (cnauroth) + Release 2.5.1 - 2014-09-05 INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 3f978fb27c1..ed08be05d8f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -22,8 +22,6 @@ import static org.apache.hadoop.crypto.key.KeyProviderCryptoExtension .EncryptedKeyVersion; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_CODEC_CLASSES_KEY_PREFIX; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_CIPHER_SUITE_KEY; -import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT; -import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT; @@ -90,6 +88,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -616,13 +615,15 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY, DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT); NameNodeProxies.ProxyAndInfo proxyInfo = null; + AtomicBoolean nnFallbackToSimpleAuth = new AtomicBoolean(false); if (numResponseToDrop > 0) { // This case is used for testing. LOG.warn(DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY + " is set to " + numResponseToDrop + ", this hacked client will proactively drop responses"); proxyInfo = NameNodeProxies.createProxyWithLossyRetryHandler(conf, - nameNodeUri, ClientProtocol.class, numResponseToDrop); + nameNodeUri, ClientProtocol.class, numResponseToDrop, + nnFallbackToSimpleAuth); } if (proxyInfo != null) { @@ -637,7 +638,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, Preconditions.checkArgument(nameNodeUri != null, "null URI"); proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri, - ClientProtocol.class); + ClientProtocol.class, nnFallbackToSimpleAuth); this.dtService = proxyInfo.getDelegationTokenService(); this.namenode = proxyInfo.getProxy(); } @@ -675,10 +676,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, } this.saslClient = new SaslDataTransferClient( DataTransferSaslUtil.getSaslPropertiesResolver(conf), - TrustedChannelResolver.getInstance(conf), - conf.getBoolean( - IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, - IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT)); + TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth); } /** @@ -3113,4 +3111,13 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, public void setKeyProvider(KeyProviderCryptoExtension provider) { this.provider = provider; } + + /** + * Returns the SaslDataTransferClient configured for this DFSClient. + * + * @return SaslDataTransferClient configured for this DFSClient + */ + public SaslDataTransferClient getSaslDataTransferClient() { + return saslClient; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 609b4c6025f..3c5358f7213 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -589,6 +589,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_DATA_ENCRYPTION_ALGORITHM_KEY = "dfs.encrypt.data.transfer.algorithm"; public static final String DFS_TRUSTEDCHANNEL_RESOLVER_CLASS = "dfs.trustedchannel.resolver.class"; public static final String DFS_DATA_TRANSFER_PROTECTION_KEY = "dfs.data.transfer.protection"; + public static final String DFS_DATA_TRANSFER_PROTECTION_DEFAULT = ""; public static final String DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY = "dfs.data.transfer.saslproperties.resolver.class"; public static final int DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES_DEFAULT = 100; public static final String DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES = "dfs.namenode.list.encryption.zones.num.responses"; @@ -703,4 +704,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final int DFS_NAMENODE_INOTIFY_MAX_EVENTS_PER_RPC_DEFAULT = 1000; + public static final String IGNORE_SECURE_PORTS_FOR_TESTING_KEY = + "ignore.secure.ports.for.testing"; + public static final boolean IGNORE_SECURE_PORTS_FOR_TESTING_DEFAULT = false; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java index 90acedea12c..f91f7094bb4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java @@ -244,7 +244,7 @@ public class HAUtil { // Create the proxy provider. Actual proxy is not created. AbstractNNFailoverProxyProvider provider = NameNodeProxies .createFailoverProxyProvider(conf, nameNodeUri, ClientProtocol.class, - false); + false, null); // No need to use logical URI since failover is not configured. if (provider == null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java index 17653345ef9..fcc2f5fdb69 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java @@ -36,6 +36,7 @@ import java.net.URI; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -145,13 +146,37 @@ public class NameNodeProxies { @SuppressWarnings("unchecked") public static ProxyAndInfo createProxy(Configuration conf, URI nameNodeUri, Class xface) throws IOException { + return createProxy(conf, nameNodeUri, xface, null); + } + + /** + * Creates the namenode proxy with the passed protocol. This will handle + * creation of either HA- or non-HA-enabled proxy objects, depending upon + * if the provided URI is a configured logical URI. + * + * @param conf the configuration containing the required IPC + * properties, client failover configurations, etc. + * @param nameNodeUri the URI pointing either to a specific NameNode + * or to a logical nameservice. + * @param xface the IPC interface which should be created + * @param fallbackToSimpleAuth set to true or false during calls to indicate if + * a secure client falls back to simple auth + * @return an object containing both the proxy and the associated + * delegation token service it corresponds to + * @throws IOException if there is an error creating the proxy + **/ + @SuppressWarnings("unchecked") + public static ProxyAndInfo createProxy(Configuration conf, + URI nameNodeUri, Class xface, AtomicBoolean fallbackToSimpleAuth) + throws IOException { AbstractNNFailoverProxyProvider failoverProxyProvider = - createFailoverProxyProvider(conf, nameNodeUri, xface, true); + createFailoverProxyProvider(conf, nameNodeUri, xface, true, + fallbackToSimpleAuth); if (failoverProxyProvider == null) { // Non-HA case return createNonHAProxy(conf, NameNode.getAddress(nameNodeUri), xface, - UserGroupInformation.getCurrentUser(), true); + UserGroupInformation.getCurrentUser(), true, fallbackToSimpleAuth); } else { // HA case Conf config = new Conf(conf); @@ -187,6 +212,8 @@ public class NameNodeProxies { * or to a logical nameservice. * @param xface the IPC interface which should be created * @param numResponseToDrop The number of responses to drop for each RPC call + * @param fallbackToSimpleAuth set to true or false during calls to indicate if + * a secure client falls back to simple auth * @return an object containing both the proxy and the associated * delegation token service it corresponds to. Will return null of the * given configuration does not support HA. @@ -195,10 +222,12 @@ public class NameNodeProxies { @SuppressWarnings("unchecked") public static ProxyAndInfo createProxyWithLossyRetryHandler( Configuration config, URI nameNodeUri, Class xface, - int numResponseToDrop) throws IOException { + int numResponseToDrop, AtomicBoolean fallbackToSimpleAuth) + throws IOException { Preconditions.checkArgument(numResponseToDrop > 0); AbstractNNFailoverProxyProvider failoverProxyProvider = - createFailoverProxyProvider(config, nameNodeUri, xface, true); + createFailoverProxyProvider(config, nameNodeUri, xface, true, + fallbackToSimpleAuth); if (failoverProxyProvider != null) { // HA case int delay = config.getInt( @@ -257,12 +286,35 @@ public class NameNodeProxies { public static ProxyAndInfo createNonHAProxy( Configuration conf, InetSocketAddress nnAddr, Class xface, UserGroupInformation ugi, boolean withRetries) throws IOException { + return createNonHAProxy(conf, nnAddr, xface, ugi, withRetries, null); + } + + /** + * Creates an explicitly non-HA-enabled proxy object. Most of the time you + * don't want to use this, and should instead use {@link NameNodeProxies#createProxy}. + * + * @param conf the configuration object + * @param nnAddr address of the remote NN to connect to + * @param xface the IPC interface which should be created + * @param ugi the user who is making the calls on the proxy object + * @param withRetries certain interfaces have a non-standard retry policy + * @param fallbackToSimpleAuth - set to true or false during this method to + * indicate if a secure client falls back to simple auth + * @return an object containing both the proxy and the associated + * delegation token service it corresponds to + * @throws IOException + */ + @SuppressWarnings("unchecked") + public static ProxyAndInfo createNonHAProxy( + Configuration conf, InetSocketAddress nnAddr, Class xface, + UserGroupInformation ugi, boolean withRetries, + AtomicBoolean fallbackToSimpleAuth) throws IOException { Text dtService = SecurityUtil.buildTokenService(nnAddr); T proxy; if (xface == ClientProtocol.class) { proxy = (T) createNNProxyWithClientProtocol(nnAddr, conf, ugi, - withRetries); + withRetries, fallbackToSimpleAuth); } else if (xface == JournalProtocol.class) { proxy = (T) createNNProxyWithJournalProtocol(nnAddr, conf, ugi); } else if (xface == NamenodeProtocol.class) { @@ -351,7 +403,8 @@ public class NameNodeProxies { private static ClientProtocol createNNProxyWithClientProtocol( InetSocketAddress address, Configuration conf, UserGroupInformation ugi, - boolean withRetries) throws IOException { + boolean withRetries, AtomicBoolean fallbackToSimpleAuth) + throws IOException { RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class); final RetryPolicy defaultPolicy = @@ -367,8 +420,8 @@ public class NameNodeProxies { ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy( ClientNamenodeProtocolPB.class, version, address, ugi, conf, NetUtils.getDefaultSocketFactory(conf), - org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy) - .getProxy(); + org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy, + fallbackToSimpleAuth).getProxy(); if (withRetries) { // create the proxy with retries @@ -440,8 +493,8 @@ public class NameNodeProxies { /** Creates the Failover proxy provider instance*/ @VisibleForTesting public static AbstractNNFailoverProxyProvider createFailoverProxyProvider( - Configuration conf, URI nameNodeUri, Class xface, boolean checkPort) - throws IOException { + Configuration conf, URI nameNodeUri, Class xface, boolean checkPort, + AtomicBoolean fallbackToSimpleAuth) throws IOException { Class> failoverProxyProviderClass = null; AbstractNNFailoverProxyProvider providerNN; Preconditions.checkArgument( @@ -490,6 +543,7 @@ public class NameNodeProxies { + " and does not use port information."); } } + providerNN.setFallbackToSimpleAuth(fallbackToSimpleAuth); return providerNN; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java index 643af4a9fb1..9df9929dfd4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java @@ -28,6 +28,7 @@ import java.io.OutputStream; import java.net.InetAddress; import java.net.Socket; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import javax.security.auth.callback.Callback; import javax.security.auth.callback.CallbackHandler; @@ -71,21 +72,38 @@ public class SaslDataTransferClient { private static final Logger LOG = LoggerFactory.getLogger( SaslDataTransferClient.class); - private final boolean fallbackToSimpleAuthAllowed; + private final AtomicBoolean fallbackToSimpleAuth; private final SaslPropertiesResolver saslPropsResolver; private final TrustedChannelResolver trustedChannelResolver; + /** + * Creates a new SaslDataTransferClient. This constructor is used in cases + * where it is not relevant to track if a secure client did a fallback to + * simple auth. For intra-cluster connections between data nodes in the same + * cluster, we can assume that all run under the same security configuration. + * + * @param saslPropsResolver for determining properties of SASL negotiation + * @param trustedChannelResolver for identifying trusted connections that do + * not require SASL negotiation + */ + public SaslDataTransferClient(SaslPropertiesResolver saslPropsResolver, + TrustedChannelResolver trustedChannelResolver) { + this(saslPropsResolver, trustedChannelResolver, null); + } + /** * Creates a new SaslDataTransferClient. * * @param saslPropsResolver for determining properties of SASL negotiation * @param trustedChannelResolver for identifying trusted connections that do * not require SASL negotiation + * @param fallbackToSimpleAuth checked on each attempt at general SASL + * handshake, if true forces use of simple auth */ public SaslDataTransferClient(SaslPropertiesResolver saslPropsResolver, TrustedChannelResolver trustedChannelResolver, - boolean fallbackToSimpleAuthAllowed) { - this.fallbackToSimpleAuthAllowed = fallbackToSimpleAuthAllowed; + AtomicBoolean fallbackToSimpleAuth) { + this.fallbackToSimpleAuth = fallbackToSimpleAuth; this.saslPropsResolver = saslPropsResolver; this.trustedChannelResolver = trustedChannelResolver; } @@ -221,22 +239,26 @@ public class SaslDataTransferClient { "SASL client skipping handshake in secured configuration with " + "privileged port for addr = {}, datanodeId = {}", addr, datanodeId); return null; - } else if (accessToken.getIdentifier().length == 0) { - if (!fallbackToSimpleAuthAllowed) { - throw new IOException( - "No block access token was provided (insecure cluster), but this " + - "client is configured to allow only secure connections."); - } + } else if (fallbackToSimpleAuth != null && fallbackToSimpleAuth.get()) { LOG.debug( "SASL client skipping handshake in secured configuration with " + "unsecured cluster for addr = {}, datanodeId = {}", addr, datanodeId); return null; - } else { + } else if (saslPropsResolver != null) { LOG.debug( "SASL client doing general handshake for addr = {}, datanodeId = {}", addr, datanodeId); return getSaslStreams(addr, underlyingOut, underlyingIn, accessToken, datanodeId); + } else { + // It's a secured cluster using non-privileged ports, but no SASL. The + // only way this can happen is if the DataNode has + // ignore.secure.ports.for.testing configured, so this is a rare edge case. + LOG.debug( + "SASL client skipping handshake in secured configuration with no SASL " + + "protection configured for addr = {}, datanodeId = {}", + addr, datanodeId); + return null; } } @@ -348,12 +370,6 @@ public class SaslDataTransferClient { OutputStream underlyingOut, InputStream underlyingIn, Token accessToken, DatanodeID datanodeId) throws IOException { - if (saslPropsResolver == null) { - throw new IOException(String.format("Cannot create a secured " + - "connection if DataNode listens on unprivileged port (%d) and no " + - "protection is defined in configuration property %s.", - datanodeId.getXferPort(), DFS_DATA_TRANSFER_PROTECTION_KEY)); - } Map saslProps = saslPropsResolver.getClientProperties(addr); String userName = buildUserName(accessToken); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java index 78570579323..2b82c82f26a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java @@ -112,11 +112,29 @@ public class SaslDataTransferServer { "SASL server skipping handshake in unsecured configuration for " + "peer = {}, datanodeId = {}", peer, datanodeId); return new IOStreamPair(underlyingIn, underlyingOut); - } else { + } else if (dnConf.getSaslPropsResolver() != null) { LOG.debug( "SASL server doing general handshake for peer = {}, datanodeId = {}", peer, datanodeId); return getSaslStreams(peer, underlyingOut, underlyingIn, datanodeId); + } else if (dnConf.getIgnoreSecurePortsForTesting()) { + // It's a secured cluster using non-privileged ports, but no SASL. The + // only way this can happen is if the DataNode has + // ignore.secure.ports.for.testing configured, so this is a rare edge case. + LOG.debug( + "SASL server skipping handshake in secured configuration with no SASL " + + "protection configured for peer = {}, datanodeId = {}", + peer, datanodeId); + return new IOStreamPair(underlyingIn, underlyingOut); + } else { + // The error message here intentionally does not mention + // ignore.secure.ports.for.testing. That's intended for dev use only. + // This code path is not expected to execute ever, because DataNode startup + // checks for invalid configuration and aborts. + throw new IOException(String.format("Cannot create a secured " + + "connection if DataNode listens on unprivileged port (%d) and no " + + "protection is defined in configuration property %s.", + datanodeId.getXferPort(), DFS_DATA_TRANSFER_PROTECTION_KEY)); } } @@ -257,12 +275,6 @@ public class SaslDataTransferServer { private IOStreamPair getSaslStreams(Peer peer, OutputStream underlyingOut, InputStream underlyingIn, final DatanodeID datanodeId) throws IOException { SaslPropertiesResolver saslPropsResolver = dnConf.getSaslPropsResolver(); - if (saslPropsResolver == null) { - throw new IOException(String.format("Cannot create a secured " + - "connection if DataNode listens on unprivileged port (%d) and no " + - "protection is defined in configuration property %s.", - datanodeId.getXferPort(), DFS_DATA_TRANSFER_PROTECTION_KEY)); - } Map saslProps = saslPropsResolver.getServerProperties( getPeerAddress(peer)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java index df6aa996848..cea1ab71150 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java @@ -48,8 +48,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeys; -import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.StorageType; @@ -787,12 +785,9 @@ public class Dispatcher { : Executors.newFixedThreadPool(dispatcherThreads); this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode; - final boolean fallbackToSimpleAuthAllowed = conf.getBoolean( - CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, - CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT); this.saslClient = new SaslDataTransferClient( DataTransferSaslUtil.getSaslPropertiesResolver(conf), - TrustedChannelResolver.getInstance(conf), fallbackToSimpleAuthAllowed); + TrustedChannelResolver.getInstance(conf), nnc.fallbackToSimpleAuth); } public DistributedFileSystem getDistributedFileSystem() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java index d27f33f0e87..91625314d79 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java @@ -27,6 +27,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -101,6 +102,7 @@ public class NameNodeConnector implements Closeable { private final NamenodeProtocol namenode; private final ClientProtocol client; private final KeyManager keyManager; + final AtomicBoolean fallbackToSimpleAuth = new AtomicBoolean(false); private final DistributedFileSystem fs; private final Path idPath; @@ -120,7 +122,7 @@ public class NameNodeConnector implements Closeable { this.namenode = NameNodeProxies.createProxy(conf, nameNodeUri, NamenodeProtocol.class).getProxy(); this.client = NameNodeProxies.createProxy(conf, nameNodeUri, - ClientProtocol.class).getProxy(); + ClientProtocol.class, fallbackToSimpleAuth).getProxy(); this.fs = (DistributedFileSystem)FileSystem.get(nameNodeUri, conf); final NamespaceInfo namespaceinfo = namenode.versionRequest(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java index 4a36472cb00..31276825603 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java @@ -48,6 +48,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEF import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_RESTART_REPLICA_EXPIRY_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_DEFAULT; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -90,6 +92,7 @@ public class DNConf { final String encryptionAlgorithm; final SaslPropertiesResolver saslPropsResolver; final TrustedChannelResolver trustedChannelResolver; + private final boolean ignoreSecurePortsForTesting; final long xceiverStopTimeout; final long restartReplicaExpiry; @@ -173,6 +176,9 @@ public class DNConf { this.trustedChannelResolver = TrustedChannelResolver.getInstance(conf); this.saslPropsResolver = DataTransferSaslUtil.getSaslPropertiesResolver( conf); + this.ignoreSecurePortsForTesting = conf.getBoolean( + IGNORE_SECURE_PORTS_FOR_TESTING_KEY, + IGNORE_SECURE_PORTS_FOR_TESTING_DEFAULT); this.xceiverStopTimeout = conf.getLong( DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY, @@ -238,4 +244,15 @@ public class DNConf { public TrustedChannelResolver getTrustedChannelResolver() { return trustedChannelResolver; } + + /** + * Returns true if configuration is set to skip checking for proper + * port configuration in a secured cluster. This is only intended for use in + * dev testing. + * + * @return true if configured to skip checking secured port configuration + */ + public boolean getIgnoreSecurePortsForTesting() { + return ignoreSecurePortsForTesting; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 57ff2fab318..b1ef18673ae 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hdfs.server.datanode; -import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT; -import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY; @@ -46,9 +44,12 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_KEY; import static org.apache.hadoop.util.ExitUtil.terminate; import java.io.BufferedOutputStream; @@ -170,6 +171,7 @@ import org.apache.hadoop.net.DNS; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.SaslPropertiesResolver; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; @@ -967,8 +969,6 @@ public class DataNode extends ReconfigurableBase SecureResources resources ) throws IOException { - checkSecureConfig(conf, resources); - // settings global for all BPs in the Data Node this.secureResources = resources; synchronized (this) { @@ -976,6 +976,8 @@ public class DataNode extends ReconfigurableBase } this.conf = conf; this.dnConf = new DNConf(conf); + checkSecureConfig(dnConf, conf, resources); + this.spanReceiverHost = SpanReceiverHost.getInstance(conf); if (dnConf.maxLockedMemory > 0) { @@ -1031,10 +1033,7 @@ public class DataNode extends ReconfigurableBase // exit without having to explicitly shutdown its thread pool. readaheadPool = ReadaheadPool.getInstance(); saslClient = new SaslDataTransferClient(dnConf.saslPropsResolver, - dnConf.trustedChannelResolver, - conf.getBoolean( - IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, - IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT)); + dnConf.trustedChannelResolver); saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager); } @@ -1054,23 +1053,24 @@ public class DataNode extends ReconfigurableBase * must check if the target port is a privileged port, and if so, skip the * SASL handshake. * + * @param dnConf DNConf to check * @param conf Configuration to check * @param resources SecuredResources obtained for DataNode * @throws RuntimeException if security enabled, but configuration is insecure */ - private static void checkSecureConfig(Configuration conf, + private static void checkSecureConfig(DNConf dnConf, Configuration conf, SecureResources resources) throws RuntimeException { if (!UserGroupInformation.isSecurityEnabled()) { return; } - String dataTransferProtection = conf.get(DFS_DATA_TRANSFER_PROTECTION_KEY); - if (resources != null && dataTransferProtection == null) { + SaslPropertiesResolver saslPropsResolver = dnConf.getSaslPropsResolver(); + if (resources != null && saslPropsResolver == null) { return; } - if (conf.getBoolean("ignore.secure.ports.for.testing", false)) { + if (dnConf.getIgnoreSecurePortsForTesting()) { return; } - if (dataTransferProtection != null && + if (saslPropsResolver != null && DFSUtil.getHttpPolicy(conf) == HttpConfig.Policy.HTTPS_ONLY && resources == null) { return; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java index 5cc8a4797e0..a1871233cd4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java @@ -17,9 +17,6 @@ */ package org.apache.hadoop.hdfs.server.namenode; -import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT; -import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY; - import java.io.FileNotFoundException; import java.io.IOException; import java.io.OutputStream; @@ -59,10 +56,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; -import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; -import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil; import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory; -import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; @@ -161,7 +155,6 @@ public class NamenodeFsck implements DataEncryptionKeyFactory { private List snapshottableDirs = null; private final BlockPlacementPolicy bpPolicy; - private final SaslDataTransferClient saslClient; /** * Filesystem checker. @@ -188,12 +181,6 @@ public class NamenodeFsck implements DataEncryptionKeyFactory { networktopology, namenode.getNamesystem().getBlockManager().getDatanodeManager() .getHost2DatanodeMap()); - this.saslClient = new SaslDataTransferClient( - DataTransferSaslUtil.getSaslPropertiesResolver(conf), - TrustedChannelResolver.getInstance(conf), - conf.getBoolean( - IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, - IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT)); for (Iterator it = pmap.keySet().iterator(); it.hasNext();) { String key = it.next(); @@ -594,7 +581,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory { * bad. Both places should be refactored to provide a method to copy blocks * around. */ - private void copyBlock(DFSClient dfs, LocatedBlock lblock, + private void copyBlock(final DFSClient dfs, LocatedBlock lblock, OutputStream fos) throws Exception { int failures = 0; InetSocketAddress targetAddr = null; @@ -647,8 +634,9 @@ public class NamenodeFsck implements DataEncryptionKeyFactory { try { s.connect(addr, HdfsServerConstants.READ_TIMEOUT); s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT); - peer = TcpPeerServer.peerFromSocketAndKey(saslClient, s, - NamenodeFsck.this, blockToken, datanodeId); + peer = TcpPeerServer.peerFromSocketAndKey( + dfs.getSaslDataTransferClient(), s, NamenodeFsck.this, + blockToken, datanodeId); } finally { if (peer == null) { IOUtils.closeQuietly(s); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java index 3c0edfd8f4a..08e82be59db 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java @@ -18,12 +18,16 @@ package org.apache.hadoop.hdfs.server.namenode.ha; +import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.retry.FailoverProxyProvider; public abstract class AbstractNNFailoverProxyProvider implements FailoverProxyProvider { + protected AtomicBoolean fallbackToSimpleAuth; + /** * Inquire whether logical HA URI is used for the implementation. If it is * used, a special token handling may be needed to make sure a token acquired @@ -32,4 +36,14 @@ public abstract class AbstractNNFailoverProxyProvider implements * @return true if logical HA URI is used. false, if not used. */ public abstract boolean useLogicalURI(); + + /** + * Set for tracking if a secure client falls back to simple auth. + * + * @param fallbackToSimpleAuth - set to true or false during this method to + * indicate if a secure client falls back to simple auth + */ + public void setFallbackToSimpleAuth(AtomicBoolean fallbackToSimpleAuth) { + this.fallbackToSimpleAuth = fallbackToSimpleAuth; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java index 4d196a2adef..06aa8fafcd0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java @@ -122,7 +122,7 @@ public class ConfiguredFailoverProxyProvider extends if (current.namenode == null) { try { current.namenode = NameNodeProxies.createNonHAProxy(conf, - current.address, xface, ugi, false).getProxy(); + current.address, xface, ugi, false, fallbackToSimpleAuth).getProxy(); } catch (IOException e) { LOG.error("Failed to create RPC proxy to NameNode", e); throw new RuntimeException(e); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/TestSaslDataTransfer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/TestSaslDataTransfer.java index 7602f44b0b0..0d860b4f064 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/TestSaslDataTransfer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/TestSaslDataTransfer.java @@ -18,6 +18,9 @@ package org.apache.hadoop.hdfs.protocol.datatransfer.sasl; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_KEY; + import static org.junit.Assert.*; import java.io.IOException; @@ -29,11 +32,13 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.io.IOUtils; import org.junit.After; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import org.junit.rules.Timeout; public class TestSaslDataTransfer extends SaslDataTransferTestCase { @@ -49,6 +54,9 @@ public class TestSaslDataTransfer extends SaslDataTransferTestCase { @Rule public ExpectedException exception = ExpectedException.none(); + @Rule + public Timeout timeout = new Timeout(60000); + @After public void shutdown() { IOUtils.cleanup(null, fs); @@ -98,17 +106,6 @@ public class TestSaslDataTransfer extends SaslDataTransferTestCase { doTest(clientConf); } - @Test - public void testClientSaslNoServerSasl() throws Exception { - HdfsConfiguration clusterConf = createSecureConfig(""); - startCluster(clusterConf); - HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf); - clientConf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "authentication"); - exception.expect(IOException.class); - exception.expectMessage("could only be replicated to 0 nodes"); - doTest(clientConf); - } - @Test public void testServerSaslNoClientSasl() throws Exception { HdfsConfiguration clusterConf = createSecureConfig( @@ -121,6 +118,32 @@ public class TestSaslDataTransfer extends SaslDataTransferTestCase { doTest(clientConf); } + @Test + public void testDataNodeAbortsIfNoSasl() throws Exception { + HdfsConfiguration clusterConf = createSecureConfig(""); + exception.expect(RuntimeException.class); + exception.expectMessage("Cannot start secure DataNode"); + startCluster(clusterConf); + } + + @Test + public void testDataNodeAbortsIfNotHttpsOnly() throws Exception { + HdfsConfiguration clusterConf = createSecureConfig("authentication"); + clusterConf.set(DFS_HTTP_POLICY_KEY, + HttpConfig.Policy.HTTP_AND_HTTPS.name()); + exception.expect(RuntimeException.class); + exception.expectMessage("Cannot start secure DataNode"); + startCluster(clusterConf); + } + + @Test + public void testNoSaslAndSecurePortsIgnored() throws Exception { + HdfsConfiguration clusterConf = createSecureConfig(""); + clusterConf.setBoolean(IGNORE_SECURE_PORTS_FOR_TESTING_KEY, true); + startCluster(clusterConf); + doTest(clusterConf); + } + /** * Tests DataTransferProtocol with the given client configuration. * diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java index 899b888f442..8f7d11ae7e6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java @@ -194,7 +194,7 @@ public class TestRetryCacheWithHA { URI nnUri = dfs.getUri(); FailoverProxyProvider failoverProxyProvider = NameNodeProxies.createFailoverProxyProvider(conf, - nnUri, ClientProtocol.class, true); + nnUri, ClientProtocol.class, true, null); InvocationHandler dummyHandler = new DummyRetryInvocationHandler( failoverProxyProvider, RetryPolicies .failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL,