From ea8da4b8c006d155253186f869fc27d9f74c4a53 Mon Sep 17 00:00:00 2001 From: Chris Nauroth Date: Mon, 14 Jul 2014 18:28:02 +0000 Subject: [PATCH] HDFS-2856. Merging change r1610474 from trunk to branch-2. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1610479 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + hadoop-hdfs-project/hadoop-hdfs/pom.xml | 5 + .../hadoop/hdfs/BlockReaderFactory.java | 3 +- .../org/apache/hadoop/hdfs/DFSClient.java | 109 ++-- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 2 + .../apache/hadoop/hdfs/DFSOutputStream.java | 25 +- .../apache/hadoop/hdfs/RemotePeerFactory.java | 10 +- .../apache/hadoop/hdfs/net/EncryptedPeer.java | 7 +- .../apache/hadoop/hdfs/net/TcpPeerServer.java | 19 +- .../datatransfer/DataTransferEncryptor.java | 506 ------------------ .../sasl/DataEncryptionKeyFactory.java | 38 ++ .../sasl/DataTransferSaslUtil.java | 267 +++++++++ .../sasl/InvalidMagicNumberException.java | 44 ++ .../sasl/SaslDataTransferClient.java | 439 +++++++++++++++ .../sasl/SaslDataTransferServer.java | 381 +++++++++++++ .../datatransfer/sasl/SaslParticipant.java | 166 ++++++ .../hadoop/hdfs/server/balancer/Balancer.java | 33 +- .../server/balancer/NameNodeConnector.java | 14 +- .../hadoop/hdfs/server/common/JspHelper.java | 18 +- .../hadoop/hdfs/server/datanode/DNConf.java | 46 +- .../hadoop/hdfs/server/datanode/DataNode.java | 127 +++-- .../hdfs/server/datanode/DataXceiver.java | 76 +-- .../server/datanode/DatanodeJspHelper.java | 17 +- .../hdfs/server/namenode/NamenodeFsck.java | 34 +- .../src/main/resources/hdfs-default.xml | 31 ++ .../hadoop/hdfs/BlockReaderTestUtil.java | 8 +- .../apache/hadoop/hdfs/MiniDFSCluster.java | 36 +- .../sasl/SaslDataTransferTestCase.java | 110 ++++ .../sasl/TestSaslDataTransfer.java | 155 ++++++ .../TestBalancerWithSaslDataTransfer.java | 41 ++ .../TestBlockTokenWithDFS.java | 4 +- .../datanode/TestDataNodeVolumeFailure.java | 6 +- 32 files changed, 2041 insertions(+), 739 deletions(-) delete mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferEncryptor.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataEncryptionKeyFactory.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/InvalidMagicNumberException.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslParticipant.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferTestCase.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/TestSaslDataTransfer.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithSaslDataTransfer.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 1866587de40..f2cc10b7046 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -34,6 +34,9 @@ Release 2.6.0 - UNRELEASED HDFS-3851. DFSOutputStream class code cleanup. (Jing Zhao via suresh) + HDFS-2856. Fix block protocol so that Datanodes don't require root or jsvc. + (cnauroth) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-hdfs-project/hadoop-hdfs/pom.xml b/hadoop-hdfs-project/hadoop-hdfs/pom.xml index 4c454e478af..d19ac38cee8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/pom.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/pom.xml @@ -145,6 +145,11 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd"> junit test + + org.apache.hadoop + hadoop-minikdc + test + org.mockito mockito-all diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java index 3355404c7a7..d27bd6ef0d2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java @@ -744,7 +744,8 @@ private BlockReaderPeer nextTcpPeer() throws IOException { } } try { - Peer peer = remotePeerFactory.newConnectedPeer(inetSocketAddress); + Peer peer = remotePeerFactory.newConnectedPeer(inetSocketAddress, token, + datanode); if (LOG.isTraceEnabled()) { LOG.trace("nextTcpPeer: created newConnectedPeer " + peer); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 14fbeef1794..9edb3db6585 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hdfs; +import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT; +import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT; @@ -93,7 +95,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockStorageLocation; @@ -136,6 +137,7 @@ import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; +import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -153,16 +155,19 @@ import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; -import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor; import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.hdfs.protocol.datatransfer.Op; import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure; import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.protocolPB.PBHelper; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; @@ -209,7 +214,8 @@ * ********************************************************/ @InterfaceAudience.Private -public class DFSClient implements java.io.Closeable, RemotePeerFactory { +public class DFSClient implements java.io.Closeable, RemotePeerFactory, + DataEncryptionKeyFactory { public static final Log LOG = LogFactory.getLog(DFSClient.class); public static final long SERVER_DEFAULTS_VALIDITY_PERIOD = 60 * 60 * 1000L; // 1 hour static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB @@ -233,7 +239,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory { private final Random r = new Random(); private SocketAddress[] localInterfaceAddrs; private DataEncryptionKey encryptionKey; - final TrustedChannelResolver trustedChannelResolver; + final SaslDataTransferClient saslClient; private final CachingStrategy defaultReadCachingStrategy; private final CachingStrategy defaultWriteCachingStrategy; private final ClientContext clientContext; @@ -635,7 +641,12 @@ public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, if (numThreads > 0) { this.initThreadsNumForHedgedReads(numThreads); } - this.trustedChannelResolver = TrustedChannelResolver.getInstance(getConfiguration()); + this.saslClient = new SaslDataTransferClient( + DataTransferSaslUtil.getSaslPropertiesResolver(conf), + TrustedChannelResolver.getInstance(conf), + conf.getBoolean( + IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, + IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT)); } /** @@ -1805,23 +1816,6 @@ public HdfsFileStatus getFileLinkInfo(String src) throws IOException { UnresolvedPathException.class); } } - - /** - * Get the checksum of the whole file of a range of the file. Note that the - * range always starts from the beginning of the file. - * @param src The file path - * @param length The length of the range - * @return The checksum - * @see DistributedFileSystem#getFileChecksum(Path) - */ - public MD5MD5CRC32FileChecksum getFileChecksum(String src, long length) - throws IOException { - checkOpen(); - Preconditions.checkArgument(length >= 0); - return getFileChecksum(src, length, clientName, namenode, - socketFactory, dfsClientConf.socketTimeout, getDataEncryptionKey(), - dfsClientConf.connectToDnViaHostname); - } @InterfaceAudience.Private public void clearDataEncryptionKey() { @@ -1841,11 +1835,9 @@ boolean shouldEncryptData() throws IOException { return d == null ? false : d.getEncryptDataTransfer(); } - @InterfaceAudience.Private - public DataEncryptionKey getDataEncryptionKey() - throws IOException { - if (shouldEncryptData() && - !this.trustedChannelResolver.isTrusted()) { + @Override + public DataEncryptionKey newDataEncryptionKey() throws IOException { + if (shouldEncryptData()) { synchronized (this) { if (encryptionKey == null || encryptionKey.expiryDate < Time.now()) { @@ -1860,22 +1852,17 @@ public DataEncryptionKey getDataEncryptionKey() } /** - * Get the checksum of the whole file or a range of the file. + * Get the checksum of the whole file of a range of the file. Note that the + * range always starts from the beginning of the file. * @param src The file path * @param length the length of the range, i.e., the range is [0, length] - * @param clientName the name of the client requesting the checksum. - * @param namenode the RPC proxy for the namenode - * @param socketFactory to create sockets to connect to DNs - * @param socketTimeout timeout to use when connecting and waiting for a response - * @param encryptionKey the key needed to communicate with DNs in this cluster - * @param connectToDnViaHostname whether the client should use hostnames instead of IPs * @return The checksum + * @see DistributedFileSystem#getFileChecksum(Path) */ - private static MD5MD5CRC32FileChecksum getFileChecksum(String src, - long length, String clientName, ClientProtocol namenode, - SocketFactory socketFactory, int socketTimeout, - DataEncryptionKey encryptionKey, boolean connectToDnViaHostname) + public MD5MD5CRC32FileChecksum getFileChecksum(String src, long length) throws IOException { + checkOpen(); + Preconditions.checkArgument(length >= 0); //get block locations for the file range LocatedBlocks blockLocations = callGetBlockLocations(namenode, src, 0, length); @@ -1910,7 +1897,7 @@ private static MD5MD5CRC32FileChecksum getFileChecksum(String src, final DatanodeInfo[] datanodes = lb.getLocations(); //try each datanode location of the block - final int timeout = 3000 * datanodes.length + socketTimeout; + final int timeout = 3000 * datanodes.length + dfsClientConf.socketTimeout; boolean done = false; for(int j = 0; !done && j < datanodes.length; j++) { DataOutputStream out = null; @@ -1918,8 +1905,7 @@ private static MD5MD5CRC32FileChecksum getFileChecksum(String src, try { //connect to a datanode - IOStreamPair pair = connectToDN(socketFactory, connectToDnViaHostname, - encryptionKey, datanodes[j], timeout); + IOStreamPair pair = connectToDN(datanodes[j], timeout, lb); out = new DataOutputStream(new BufferedOutputStream(pair.out, HdfsConstants.SMALL_BUFFER_SIZE)); in = new DataInputStream(pair.in); @@ -1975,9 +1961,7 @@ else if (bpc != bytesPerCRC) { } else { LOG.debug("Retrieving checksum from an earlier-version DataNode: " + "inferring checksum by reading first byte"); - ct = inferChecksumTypeByReading( - clientName, socketFactory, socketTimeout, lb, datanodes[j], - encryptionKey, connectToDnViaHostname); + ct = inferChecksumTypeByReading(lb, datanodes[j]); } if (i == 0) { // first block @@ -2051,16 +2035,13 @@ else if (bpc != bytesPerCRC) { * Connect to the given datanode's datantrasfer port, and return * the resulting IOStreamPair. This includes encryption wrapping, etc. */ - private static IOStreamPair connectToDN( - SocketFactory socketFactory, boolean connectToDnViaHostname, - DataEncryptionKey encryptionKey, DatanodeInfo dn, int timeout) - throws IOException - { + private IOStreamPair connectToDN(DatanodeInfo dn, int timeout, + LocatedBlock lb) throws IOException { boolean success = false; Socket sock = null; try { sock = socketFactory.createSocket(); - String dnAddr = dn.getXferAddr(connectToDnViaHostname); + String dnAddr = dn.getXferAddr(getConf().connectToDnViaHostname); if (LOG.isDebugEnabled()) { LOG.debug("Connecting to datanode " + dnAddr); } @@ -2069,13 +2050,8 @@ private static IOStreamPair connectToDN( OutputStream unbufOut = NetUtils.getOutputStream(sock); InputStream unbufIn = NetUtils.getInputStream(sock); - IOStreamPair ret; - if (encryptionKey != null) { - ret = DataTransferEncryptor.getEncryptedStreams( - unbufOut, unbufIn, encryptionKey); - } else { - ret = new IOStreamPair(unbufIn, unbufOut); - } + IOStreamPair ret = saslClient.newSocketSend(sock, unbufOut, unbufIn, this, + lb.getBlockToken(), dn); success = true; return ret; } finally { @@ -2091,21 +2067,14 @@ private static IOStreamPair connectToDN( * with older HDFS versions which did not include the checksum type in * OpBlockChecksumResponseProto. * - * @param in input stream from datanode - * @param out output stream to datanode * @param lb the located block - * @param clientName the name of the DFSClient requesting the checksum * @param dn the connected datanode * @return the inferred checksum type * @throws IOException if an error occurs */ - private static Type inferChecksumTypeByReading( - String clientName, SocketFactory socketFactory, int socketTimeout, - LocatedBlock lb, DatanodeInfo dn, - DataEncryptionKey encryptionKey, boolean connectToDnViaHostname) + private Type inferChecksumTypeByReading(LocatedBlock lb, DatanodeInfo dn) throws IOException { - IOStreamPair pair = connectToDN(socketFactory, connectToDnViaHostname, - encryptionKey, dn, socketTimeout); + IOStreamPair pair = connectToDN(dn, dfsClientConf.socketTimeout, lb); try { DataOutputStream out = new DataOutputStream(new BufferedOutputStream(pair.out, @@ -2858,7 +2827,9 @@ public void removeXAttr(String src, String name) throws IOException { } @Override // RemotePeerFactory - public Peer newConnectedPeer(InetSocketAddress addr) throws IOException { + public Peer newConnectedPeer(InetSocketAddress addr, + Token blockToken, DatanodeID datanodeId) + throws IOException { Peer peer = null; boolean success = false; Socket sock = null; @@ -2867,8 +2838,8 @@ public Peer newConnectedPeer(InetSocketAddress addr) throws IOException { NetUtils.connect(sock, addr, getRandomLocalInterfaceAddr(), dfsClientConf.socketTimeout); - peer = TcpPeerServer.peerFromSocketAndKey(sock, - getDataEncryptionKey()); + peer = TcpPeerServer.peerFromSocketAndKey(saslClient, sock, this, + blockToken, datanodeId); success = true; return peer; } finally { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index d41239c5639..aea6b72acef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -565,6 +565,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final boolean DFS_ENCRYPT_DATA_TRANSFER_DEFAULT = false; public static final String DFS_DATA_ENCRYPTION_ALGORITHM_KEY = "dfs.encrypt.data.transfer.algorithm"; public static final String DFS_TRUSTEDCHANNEL_RESOLVER_CLASS = "dfs.trustedchannel.resolver.class"; + public static final String DFS_DATA_TRANSFER_PROTECTION_KEY = "dfs.data.transfer.protection"; + public static final String DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY = "dfs.data.transfer.saslproperties.resolver.class"; // Journal-node related configs. These are read on the JN side. public static final String DFS_JOURNALNODE_EDITS_DIR_KEY = "dfs.journalnode.edits.dir"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 920ef9f4048..9d1acf668ce 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -61,7 +61,6 @@ import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException; import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; -import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; @@ -1047,14 +1046,10 @@ private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets, OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout); InputStream unbufIn = NetUtils.getInputStream(sock); - if (dfsClient.shouldEncryptData() && - !dfsClient.trustedChannelResolver.isTrusted(sock.getInetAddress())) { - IOStreamPair encryptedStreams = - DataTransferEncryptor.getEncryptedStreams( - unbufOut, unbufIn, dfsClient.getDataEncryptionKey()); - unbufOut = encryptedStreams.out; - unbufIn = encryptedStreams.in; - } + IOStreamPair saslStreams = dfsClient.saslClient.socketSend(sock, + unbufOut, unbufIn, dfsClient, blockToken, src); + unbufOut = saslStreams.out; + unbufIn = saslStreams.in; out = new DataOutputStream(new BufferedOutputStream(unbufOut, HdfsConstants.SMALL_BUFFER_SIZE)); in = new DataInputStream(unbufIn); @@ -1325,14 +1320,10 @@ private boolean createBlockOutputStream(DatanodeInfo[] nodes, long newGS, OutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout); InputStream unbufIn = NetUtils.getInputStream(s); - if (dfsClient.shouldEncryptData() && - !dfsClient.trustedChannelResolver.isTrusted(s.getInetAddress())) { - IOStreamPair encryptedStreams = - DataTransferEncryptor.getEncryptedStreams(unbufOut, - unbufIn, dfsClient.getDataEncryptionKey()); - unbufOut = encryptedStreams.out; - unbufIn = encryptedStreams.in; - } + IOStreamPair saslStreams = dfsClient.saslClient.socketSend(s, + unbufOut, unbufIn, dfsClient, accessToken, nodes[0]); + unbufOut = saslStreams.out; + unbufIn = saslStreams.in; out = new DataOutputStream(new BufferedOutputStream(unbufOut, HdfsConstants.SMALL_BUFFER_SIZE)); blockReplyStream = new DataInputStream(unbufIn); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java index 17cdb3060c4..5afff0081b8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java @@ -21,15 +21,21 @@ import java.net.InetSocketAddress; import org.apache.hadoop.hdfs.net.Peer; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.security.token.Token; public interface RemotePeerFactory { /** * @param addr The address to connect to. - * + * @param blockToken Token used during optional SASL negotiation + * @param datanodeId ID of destination DataNode * @return A new Peer connected to the address. * * @throws IOException If there was an error connecting or creating * the remote socket, encrypted stream, etc. */ - Peer newConnectedPeer(InetSocketAddress addr) throws IOException; + Peer newConnectedPeer(InetSocketAddress addr, + Token blockToken, DatanodeID datanodeId) + throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java index e13213dc663..da660c7d464 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java @@ -19,9 +19,7 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor; import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; -import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.net.unix.DomainSocket; import java.io.InputStream; @@ -51,11 +49,8 @@ public class EncryptedPeer implements Peer { */ private final ReadableByteChannel channel; - public EncryptedPeer(Peer enclosedPeer, DataEncryptionKey key) - throws IOException { + public EncryptedPeer(Peer enclosedPeer, IOStreamPair ios) { this.enclosedPeer = enclosedPeer; - IOStreamPair ios = DataTransferEncryptor.getEncryptedStreams( - enclosedPeer.getOutputStream(), enclosedPeer.getInputStream(), key); this.in = ios.in; this.out = ios.out; this.channel = ios.in instanceof ReadableByteChannel ? diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java index ddef592ec7c..2a547e0dcf4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java @@ -28,10 +28,14 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.security.token.Token; @InterfaceAudience.Private public class TcpPeerServer implements PeerServer { @@ -74,15 +78,16 @@ public static Peer peerFromSocket(Socket socket) } } - public static Peer peerFromSocketAndKey(Socket s, - DataEncryptionKey key) throws IOException { + public static Peer peerFromSocketAndKey( + SaslDataTransferClient saslClient, Socket s, + DataEncryptionKeyFactory keyFactory, + Token blockToken, DatanodeID datanodeId) + throws IOException { Peer peer = null; boolean success = false; try { - peer = peerFromSocket(s); - if (key != null) { - peer = new EncryptedPeer(peer, key); - } + peer = peerFromSocket(s); + peer = saslClient.peerSend(peer, keyFactory, blockToken, datanodeId); success = true; return peer; } finally { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferEncryptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferEncryptor.java deleted file mode 100644 index e069a391b8b..00000000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferEncryptor.java +++ /dev/null @@ -1,506 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.protocol.datatransfer; - -import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Map; -import java.util.TreeMap; - -import javax.security.auth.callback.Callback; -import javax.security.auth.callback.CallbackHandler; -import javax.security.auth.callback.NameCallback; -import javax.security.auth.callback.PasswordCallback; -import javax.security.auth.callback.UnsupportedCallbackException; -import javax.security.sasl.AuthorizeCallback; -import javax.security.sasl.RealmCallback; -import javax.security.sasl.RealmChoiceCallback; -import javax.security.sasl.Sasl; -import javax.security.sasl.SaslClient; -import javax.security.sasl.SaslException; -import javax.security.sasl.SaslServer; - -import org.apache.commons.codec.binary.Base64; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus; -import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager; -import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; -import org.apache.hadoop.security.SaslInputStream; -import org.apache.hadoop.security.SaslOutputStream; - -import com.google.common.base.Charsets; -import com.google.common.collect.Maps; -import com.google.protobuf.ByteString; - -/** - * A class which, given connected input/output streams, will perform a - * handshake using those streams based on SASL to produce new Input/Output - * streams which will encrypt/decrypt all data written/read from said streams. - * Much of this is inspired by or borrowed from the TSaslTransport in Apache - * Thrift, but with some HDFS-specific tweaks. - */ -@InterfaceAudience.Private -public class DataTransferEncryptor { - - public static final Log LOG = LogFactory.getLog(DataTransferEncryptor.class); - - /** - * Sent by clients and validated by servers. We use a number that's unlikely - * to ever be sent as the value of the DATA_TRANSFER_VERSION. - */ - private static final int ENCRYPTED_TRANSFER_MAGIC_NUMBER = 0xDEADBEEF; - - /** - * Delimiter for the three-part SASL username string. - */ - private static final String NAME_DELIMITER = " "; - - // This has to be set as part of the SASL spec, but it don't matter for - // our purposes, but may not be empty. It's sent over the wire, so use - // a short string. - private static final String SERVER_NAME = "0"; - - private static final String PROTOCOL = "hdfs"; - private static final String MECHANISM = "DIGEST-MD5"; - private static final Map SASL_PROPS = new TreeMap(); - - static { - SASL_PROPS.put(Sasl.QOP, "auth-conf"); - SASL_PROPS.put(Sasl.SERVER_AUTH, "true"); - } - - /** - * Factory method for DNs, where the nonce, keyId, and encryption key are not - * yet known. The nonce and keyId will be sent by the client, and the DN - * will then use those pieces of info and the secret key shared with the NN - * to determine the encryptionKey used for the SASL handshake/encryption. - * - * Establishes a secure connection assuming that the party on the other end - * has the same shared secret. This does a SASL connection handshake, but not - * a general-purpose one. It's specific to the MD5-DIGEST SASL mechanism with - * auth-conf enabled. In particular, it doesn't support an arbitrary number of - * challenge/response rounds, and we know that the client will never have an - * initial response, so we don't check for one. - * - * @param underlyingOut output stream to write to the other party - * @param underlyingIn input stream to read from the other party - * @param blockPoolTokenSecretManager secret manager capable of constructing - * encryption key based on keyId, blockPoolId, and nonce - * @return a pair of streams which wrap the given streams and encrypt/decrypt - * all data read/written - * @throws IOException in the event of error - */ - public static IOStreamPair getEncryptedStreams( - OutputStream underlyingOut, InputStream underlyingIn, - BlockPoolTokenSecretManager blockPoolTokenSecretManager, - String encryptionAlgorithm) throws IOException { - - DataInputStream in = new DataInputStream(underlyingIn); - DataOutputStream out = new DataOutputStream(underlyingOut); - - Map saslProps = Maps.newHashMap(SASL_PROPS); - saslProps.put("com.sun.security.sasl.digest.cipher", encryptionAlgorithm); - - if (LOG.isDebugEnabled()) { - LOG.debug("Server using encryption algorithm " + encryptionAlgorithm); - } - - SaslParticipant sasl = new SaslParticipant(Sasl.createSaslServer(MECHANISM, - PROTOCOL, SERVER_NAME, saslProps, - new SaslServerCallbackHandler(blockPoolTokenSecretManager))); - - int magicNumber = in.readInt(); - if (magicNumber != ENCRYPTED_TRANSFER_MAGIC_NUMBER) { - throw new InvalidMagicNumberException(magicNumber); - } - try { - // step 1 - performSaslStep1(out, in, sasl); - - // step 2 (server-side only) - byte[] remoteResponse = readSaslMessage(in); - byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse); - sendSaslMessage(out, localResponse); - - // SASL handshake is complete - checkSaslComplete(sasl); - - return sasl.createEncryptedStreamPair(out, in); - } catch (IOException ioe) { - if (ioe instanceof SaslException && - ioe.getCause() != null && - ioe.getCause() instanceof InvalidEncryptionKeyException) { - // This could just be because the client is long-lived and hasn't gotten - // a new encryption key from the NN in a while. Upon receiving this - // error, the client will get a new encryption key from the NN and retry - // connecting to this DN. - sendInvalidKeySaslErrorMessage(out, ioe.getCause().getMessage()); - } else { - sendGenericSaslErrorMessage(out, ioe.getMessage()); - } - throw ioe; - } - } - - /** - * Factory method for clients, where the encryption token is already created. - * - * Establishes a secure connection assuming that the party on the other end - * has the same shared secret. This does a SASL connection handshake, but not - * a general-purpose one. It's specific to the MD5-DIGEST SASL mechanism with - * auth-conf enabled. In particular, it doesn't support an arbitrary number of - * challenge/response rounds, and we know that the client will never have an - * initial response, so we don't check for one. - * - * @param underlyingOut output stream to write to the other party - * @param underlyingIn input stream to read from the other party - * @param encryptionKey all info required to establish an encrypted stream - * @return a pair of streams which wrap the given streams and encrypt/decrypt - * all data read/written - * @throws IOException in the event of error - */ - public static IOStreamPair getEncryptedStreams( - OutputStream underlyingOut, InputStream underlyingIn, - DataEncryptionKey encryptionKey) - throws IOException { - - Map saslProps = Maps.newHashMap(SASL_PROPS); - saslProps.put("com.sun.security.sasl.digest.cipher", - encryptionKey.encryptionAlgorithm); - - if (LOG.isDebugEnabled()) { - LOG.debug("Client using encryption algorithm " + - encryptionKey.encryptionAlgorithm); - } - - DataOutputStream out = new DataOutputStream(underlyingOut); - DataInputStream in = new DataInputStream(underlyingIn); - - String userName = getUserNameFromEncryptionKey(encryptionKey); - SaslParticipant sasl = new SaslParticipant(Sasl.createSaslClient( - new String[] { MECHANISM }, userName, PROTOCOL, SERVER_NAME, saslProps, - new SaslClientCallbackHandler(encryptionKey.encryptionKey, userName))); - - out.writeInt(ENCRYPTED_TRANSFER_MAGIC_NUMBER); - out.flush(); - - try { - // Start of handshake - "initial response" in SASL terminology. - sendSaslMessage(out, new byte[0]); - - // step 1 - performSaslStep1(out, in, sasl); - - // step 2 (client-side only) - byte[] remoteResponse = readSaslMessage(in); - byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse); - assert localResponse == null; - - // SASL handshake is complete - checkSaslComplete(sasl); - - return sasl.createEncryptedStreamPair(out, in); - } catch (IOException ioe) { - sendGenericSaslErrorMessage(out, ioe.getMessage()); - throw ioe; - } - } - - private static void performSaslStep1(DataOutputStream out, DataInputStream in, - SaslParticipant sasl) throws IOException { - byte[] remoteResponse = readSaslMessage(in); - byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse); - sendSaslMessage(out, localResponse); - } - - private static void checkSaslComplete(SaslParticipant sasl) throws IOException { - if (!sasl.isComplete()) { - throw new IOException("Failed to complete SASL handshake"); - } - - if (!sasl.supportsConfidentiality()) { - throw new IOException("SASL handshake completed, but channel does not " + - "support encryption"); - } - } - - private static void sendSaslMessage(DataOutputStream out, byte[] payload) - throws IOException { - sendSaslMessage(out, DataTransferEncryptorStatus.SUCCESS, payload, null); - } - - private static void sendInvalidKeySaslErrorMessage(DataOutputStream out, - String message) throws IOException { - sendSaslMessage(out, DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY, null, - message); - } - - private static void sendGenericSaslErrorMessage(DataOutputStream out, - String message) throws IOException { - sendSaslMessage(out, DataTransferEncryptorStatus.ERROR, null, message); - } - - private static void sendSaslMessage(OutputStream out, - DataTransferEncryptorStatus status, byte[] payload, String message) - throws IOException { - DataTransferEncryptorMessageProto.Builder builder = - DataTransferEncryptorMessageProto.newBuilder(); - - builder.setStatus(status); - if (payload != null) { - builder.setPayload(ByteString.copyFrom(payload)); - } - if (message != null) { - builder.setMessage(message); - } - - DataTransferEncryptorMessageProto proto = builder.build(); - proto.writeDelimitedTo(out); - out.flush(); - } - - private static byte[] readSaslMessage(DataInputStream in) throws IOException { - DataTransferEncryptorMessageProto proto = - DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in)); - if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) { - throw new InvalidEncryptionKeyException(proto.getMessage()); - } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) { - throw new IOException(proto.getMessage()); - } else { - return proto.getPayload().toByteArray(); - } - } - - /** - * Set the encryption key when asked by the server-side SASL object. - */ - private static class SaslServerCallbackHandler implements CallbackHandler { - - private final BlockPoolTokenSecretManager blockPoolTokenSecretManager; - - public SaslServerCallbackHandler(BlockPoolTokenSecretManager - blockPoolTokenSecretManager) { - this.blockPoolTokenSecretManager = blockPoolTokenSecretManager; - } - - @Override - public void handle(Callback[] callbacks) throws IOException, - UnsupportedCallbackException { - NameCallback nc = null; - PasswordCallback pc = null; - AuthorizeCallback ac = null; - for (Callback callback : callbacks) { - if (callback instanceof AuthorizeCallback) { - ac = (AuthorizeCallback) callback; - } else if (callback instanceof PasswordCallback) { - pc = (PasswordCallback) callback; - } else if (callback instanceof NameCallback) { - nc = (NameCallback) callback; - } else if (callback instanceof RealmCallback) { - continue; // realm is ignored - } else { - throw new UnsupportedCallbackException(callback, - "Unrecognized SASL DIGEST-MD5 Callback: " + callback); - } - } - - if (pc != null) { - byte[] encryptionKey = getEncryptionKeyFromUserName( - blockPoolTokenSecretManager, nc.getDefaultName()); - pc.setPassword(encryptionKeyToPassword(encryptionKey)); - } - - if (ac != null) { - ac.setAuthorized(true); - ac.setAuthorizedID(ac.getAuthorizationID()); - } - - } - - } - - /** - * Set the encryption key when asked by the client-side SASL object. - */ - private static class SaslClientCallbackHandler implements CallbackHandler { - - private final byte[] encryptionKey; - private final String userName; - - public SaslClientCallbackHandler(byte[] encryptionKey, String userName) { - this.encryptionKey = encryptionKey; - this.userName = userName; - } - - @Override - public void handle(Callback[] callbacks) throws IOException, - UnsupportedCallbackException { - NameCallback nc = null; - PasswordCallback pc = null; - RealmCallback rc = null; - for (Callback callback : callbacks) { - if (callback instanceof RealmChoiceCallback) { - continue; - } else if (callback instanceof NameCallback) { - nc = (NameCallback) callback; - } else if (callback instanceof PasswordCallback) { - pc = (PasswordCallback) callback; - } else if (callback instanceof RealmCallback) { - rc = (RealmCallback) callback; - } else { - throw new UnsupportedCallbackException(callback, - "Unrecognized SASL client callback"); - } - } - if (nc != null) { - nc.setName(userName); - } - if (pc != null) { - pc.setPassword(encryptionKeyToPassword(encryptionKey)); - } - if (rc != null) { - rc.setText(rc.getDefaultText()); - } - } - - } - - /** - * The SASL username consists of the keyId, blockPoolId, and nonce with the - * first two encoded as Strings, and the third encoded using Base64. The - * fields are each separated by a single space. - * - * @param encryptionKey the encryption key to encode as a SASL username. - * @return encoded username containing keyId, blockPoolId, and nonce - */ - private static String getUserNameFromEncryptionKey( - DataEncryptionKey encryptionKey) { - return encryptionKey.keyId + NAME_DELIMITER + - encryptionKey.blockPoolId + NAME_DELIMITER + - new String(Base64.encodeBase64(encryptionKey.nonce, false), Charsets.UTF_8); - } - - /** - * Given a secret manager and a username encoded as described above, determine - * the encryption key. - * - * @param blockPoolTokenSecretManager to determine the encryption key. - * @param userName containing the keyId, blockPoolId, and nonce. - * @return secret encryption key. - * @throws IOException - */ - private static byte[] getEncryptionKeyFromUserName( - BlockPoolTokenSecretManager blockPoolTokenSecretManager, String userName) - throws IOException { - String[] nameComponents = userName.split(NAME_DELIMITER); - if (nameComponents.length != 3) { - throw new IOException("Provided name '" + userName + "' has " + - nameComponents.length + " components instead of the expected 3."); - } - int keyId = Integer.parseInt(nameComponents[0]); - String blockPoolId = nameComponents[1]; - byte[] nonce = Base64.decodeBase64(nameComponents[2]); - return blockPoolTokenSecretManager.retrieveDataEncryptionKey(keyId, - blockPoolId, nonce); - } - - private static char[] encryptionKeyToPassword(byte[] encryptionKey) { - return new String(Base64.encodeBase64(encryptionKey, false), Charsets.UTF_8).toCharArray(); - } - - /** - * Strongly inspired by Thrift's TSaslTransport class. - * - * Used to abstract over the SaslServer and - * SaslClient classes, which share a lot of their interface, but - * unfortunately don't share a common superclass. - */ - private static class SaslParticipant { - // One of these will always be null. - public SaslServer saslServer; - public SaslClient saslClient; - - public SaslParticipant(SaslServer saslServer) { - this.saslServer = saslServer; - } - - public SaslParticipant(SaslClient saslClient) { - this.saslClient = saslClient; - } - - public byte[] evaluateChallengeOrResponse(byte[] challengeOrResponse) throws SaslException { - if (saslClient != null) { - return saslClient.evaluateChallenge(challengeOrResponse); - } else { - return saslServer.evaluateResponse(challengeOrResponse); - } - } - - public boolean isComplete() { - if (saslClient != null) - return saslClient.isComplete(); - else - return saslServer.isComplete(); - } - - public boolean supportsConfidentiality() { - String qop = null; - if (saslClient != null) { - qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP); - } else { - qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP); - } - return qop != null && qop.equals("auth-conf"); - } - - // Return some input/output streams that will henceforth have their - // communication encrypted. - private IOStreamPair createEncryptedStreamPair( - DataOutputStream out, DataInputStream in) { - if (saslClient != null) { - return new IOStreamPair( - new SaslInputStream(in, saslClient), - new SaslOutputStream(out, saslClient)); - } else { - return new IOStreamPair( - new SaslInputStream(in, saslServer), - new SaslOutputStream(out, saslServer)); - } - } - } - - @InterfaceAudience.Private - public static class InvalidMagicNumberException extends IOException { - - private static final long serialVersionUID = 1L; - - public InvalidMagicNumberException(int magicNumber) { - super(String.format("Received %x instead of %x from client.", - magicNumber, ENCRYPTED_TRANSFER_MAGIC_NUMBER)); - } - } - -} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataEncryptionKeyFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataEncryptionKeyFactory.java new file mode 100644 index 00000000000..959cba0fb48 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataEncryptionKeyFactory.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol.datatransfer.sasl; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; + +/** + * Creates a new {@link DataEncryptionKey} on demand. + */ +@InterfaceAudience.Private +public interface DataEncryptionKeyFactory { + + /** + * Creates a new DataEncryptionKey. + * + * @return DataEncryptionKey newly created + * @throws IOException for any error + */ + DataEncryptionKey newDataEncryptionKey() throws IOException; +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java new file mode 100644 index 00000000000..cd18b9fa0fa --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java @@ -0,0 +1,267 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol.datatransfer.sasl; + +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY; +import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetAddress; +import java.util.Arrays; +import java.util.Map; +import java.util.Set; +import javax.security.sasl.Sasl; + +import org.apache.commons.codec.binary.Base64; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.net.Peer; +import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus; +import org.apache.hadoop.security.SaslPropertiesResolver; +import org.apache.hadoop.security.SaslRpcServer.QualityOfProtection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Charsets; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; +import com.google.common.net.InetAddresses; +import com.google.protobuf.ByteString; + +/** + * Utility methods implementing SASL negotiation for DataTransferProtocol. + */ +@InterfaceAudience.Private +public final class DataTransferSaslUtil { + + private static final Logger LOG = LoggerFactory.getLogger( + DataTransferSaslUtil.class); + + /** + * Delimiter for the three-part SASL username string. + */ + public static final String NAME_DELIMITER = " "; + + /** + * Sent by clients and validated by servers. We use a number that's unlikely + * to ever be sent as the value of the DATA_TRANSFER_VERSION. + */ + public static final int SASL_TRANSFER_MAGIC_NUMBER = 0xDEADBEEF; + + /** + * Checks that SASL negotiation has completed for the given participant, and + * the negotiated quality of protection is included in the given SASL + * properties and therefore acceptable. + * + * @param sasl participant to check + * @param saslProps properties of SASL negotiation + * @throws IOException for any error + */ + public static void checkSaslComplete(SaslParticipant sasl, + Map saslProps) throws IOException { + if (!sasl.isComplete()) { + throw new IOException("Failed to complete SASL handshake"); + } + Set requestedQop = ImmutableSet.copyOf(Arrays.asList( + saslProps.get(Sasl.QOP).split(","))); + String negotiatedQop = sasl.getNegotiatedQop(); + LOG.debug("Verifying QOP, requested QOP = {}, negotiated QOP = {}", + requestedQop, negotiatedQop); + if (!requestedQop.contains(negotiatedQop)) { + throw new IOException(String.format("SASL handshake completed, but " + + "channel does not have acceptable quality of protection, " + + "requested = %s, negotiated = %s", requestedQop, negotiatedQop)); + } + } + + /** + * Creates SASL properties required for an encrypted SASL negotiation. + * + * @param encryptionAlgorithm to use for SASL negotation + * @return properties of encrypted SASL negotiation + */ + public static Map createSaslPropertiesForEncryption( + String encryptionAlgorithm) { + Map saslProps = Maps.newHashMapWithExpectedSize(3); + saslProps.put(Sasl.QOP, QualityOfProtection.PRIVACY.getSaslQop()); + saslProps.put(Sasl.SERVER_AUTH, "true"); + saslProps.put("com.sun.security.sasl.digest.cipher", encryptionAlgorithm); + return saslProps; + } + + /** + * For an encrypted SASL negotiation, encodes an encryption key to a SASL + * password. + * + * @param encryptionKey to encode + * @return key encoded as SASL password + */ + public static char[] encryptionKeyToPassword(byte[] encryptionKey) { + return new String(Base64.encodeBase64(encryptionKey, false), Charsets.UTF_8) + .toCharArray(); + } + + /** + * Returns InetAddress from peer. The getRemoteAddressString has the form + * [host][/ip-address]:port. The host may be missing. The IP address (and + * preceding '/') may be missing. The port preceded by ':' is always present. + * + * @param peer + * @return InetAddress from peer + */ + public static InetAddress getPeerAddress(Peer peer) { + String remoteAddr = peer.getRemoteAddressString().split(":")[0]; + int slashIdx = remoteAddr.indexOf('/'); + return InetAddresses.forString(slashIdx != -1 ? + remoteAddr.substring(slashIdx + 1, remoteAddr.length()) : + remoteAddr); + } + + /** + * Creates a SaslPropertiesResolver from the given configuration. This method + * works by cloning the configuration, translating configuration properties + * specific to DataTransferProtocol to what SaslPropertiesResolver expects, + * and then delegating to SaslPropertiesResolver for initialization. This + * method returns null if SASL protection has not been configured for + * DataTransferProtocol. + * + * @param conf configuration to read + * @return SaslPropertiesResolver for DataTransferProtocol, or null if not + * configured + */ + public static SaslPropertiesResolver getSaslPropertiesResolver( + Configuration conf) { + String qops = conf.get(DFS_DATA_TRANSFER_PROTECTION_KEY); + if (qops == null || qops.isEmpty()) { + LOG.debug("DataTransferProtocol not using SaslPropertiesResolver, no " + + "QOP found in configuration for {}", DFS_DATA_TRANSFER_PROTECTION_KEY); + return null; + } + Configuration saslPropsResolverConf = new Configuration(conf); + saslPropsResolverConf.set(HADOOP_RPC_PROTECTION, qops); + Class resolverClass = conf.getClass( + DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY, + SaslPropertiesResolver.class, SaslPropertiesResolver.class); + saslPropsResolverConf.setClass(HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS, + resolverClass, SaslPropertiesResolver.class); + SaslPropertiesResolver resolver = SaslPropertiesResolver.getInstance( + saslPropsResolverConf); + LOG.debug("DataTransferProtocol using SaslPropertiesResolver, configured " + + "QOP {} = {}, configured class {} = {}", DFS_DATA_TRANSFER_PROTECTION_KEY, qops, + DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY, resolverClass); + return resolver; + } + + /** + * Performs the first step of SASL negotiation. + * + * @param out connection output stream + * @param in connection input stream + * @param sasl participant + */ + public static void performSaslStep1(OutputStream out, InputStream in, + SaslParticipant sasl) throws IOException { + byte[] remoteResponse = readSaslMessage(in); + byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse); + sendSaslMessage(out, localResponse); + } + + /** + * Reads a SASL negotiation message. + * + * @param in stream to read + * @return bytes of SASL negotiation messsage + * @throws IOException for any error + */ + public static byte[] readSaslMessage(InputStream in) throws IOException { + DataTransferEncryptorMessageProto proto = + DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in)); + if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) { + throw new InvalidEncryptionKeyException(proto.getMessage()); + } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) { + throw new IOException(proto.getMessage()); + } else { + return proto.getPayload().toByteArray(); + } + } + + /** + * Sends a SASL negotiation message indicating an error. + * + * @param out stream to receive message + * @param message to send + * @throws IOException for any error + */ + public static void sendGenericSaslErrorMessage(OutputStream out, + String message) throws IOException { + sendSaslMessage(out, DataTransferEncryptorStatus.ERROR, null, message); + } + + /** + * Sends a SASL negotiation message. + * + * @param out stream to receive message + * @param payload to send + * @throws IOException for any error + */ + public static void sendSaslMessage(OutputStream out, byte[] payload) + throws IOException { + sendSaslMessage(out, DataTransferEncryptorStatus.SUCCESS, payload, null); + } + + /** + * Sends a SASL negotiation message. + * + * @param out stream to receive message + * @param status negotiation status + * @param payload to send + * @param message to send + * @throws IOException for any error + */ + public static void sendSaslMessage(OutputStream out, + DataTransferEncryptorStatus status, byte[] payload, String message) + throws IOException { + DataTransferEncryptorMessageProto.Builder builder = + DataTransferEncryptorMessageProto.newBuilder(); + + builder.setStatus(status); + if (payload != null) { + builder.setPayload(ByteString.copyFrom(payload)); + } + if (message != null) { + builder.setMessage(message); + } + + DataTransferEncryptorMessageProto proto = builder.build(); + proto.writeDelimitedTo(out); + out.flush(); + } + + /** + * There is no reason to instantiate this class. + */ + private DataTransferSaslUtil() { + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/InvalidMagicNumberException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/InvalidMagicNumberException.java new file mode 100644 index 00000000000..eb494a87e17 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/InvalidMagicNumberException.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol.datatransfer.sasl; + +import static org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil.SASL_TRANSFER_MAGIC_NUMBER; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Indicates that SASL protocol negotiation expected to read a pre-defined magic + * number, but the expected value was not seen. + */ +@InterfaceAudience.Private +public class InvalidMagicNumberException extends IOException { + + private static final long serialVersionUID = 1L; + + /** + * Creates a new InvalidMagicNumberException. + * + * @param magicNumber expected value + */ + public InvalidMagicNumberException(int magicNumber) { + super(String.format("Received %x instead of %x from client.", + magicNumber, SASL_TRANSFER_MAGIC_NUMBER)); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java new file mode 100644 index 00000000000..643af4a9fb1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java @@ -0,0 +1,439 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol.datatransfer.sasl; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY; +import static org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil.*; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetAddress; +import java.net.Socket; +import java.util.Map; + +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.CallbackHandler; +import javax.security.auth.callback.NameCallback; +import javax.security.auth.callback.PasswordCallback; +import javax.security.auth.callback.UnsupportedCallbackException; +import javax.security.sasl.RealmCallback; +import javax.security.sasl.RealmChoiceCallback; + +import org.apache.commons.codec.binary.Base64; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.net.EncryptedPeer; +import org.apache.hadoop.hdfs.net.Peer; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; +import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; +import org.apache.hadoop.security.SaslPropertiesResolver; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Charsets; + +/** + * Negotiates SASL for DataTransferProtocol on behalf of a client. There are + * two possible supported variants of SASL negotiation: either a general-purpose + * negotiation supporting any quality of protection, or a specialized + * negotiation that enforces privacy as the quality of protection using a + * cryptographically strong encryption key. + * + * This class is used in both the HDFS client and the DataNode. The DataNode + * needs it, because it acts as a client to other DataNodes during write + * pipelines and block transfers. + */ +@InterfaceAudience.Private +public class SaslDataTransferClient { + + private static final Logger LOG = LoggerFactory.getLogger( + SaslDataTransferClient.class); + + private final boolean fallbackToSimpleAuthAllowed; + private final SaslPropertiesResolver saslPropsResolver; + private final TrustedChannelResolver trustedChannelResolver; + + /** + * Creates a new SaslDataTransferClient. + * + * @param saslPropsResolver for determining properties of SASL negotiation + * @param trustedChannelResolver for identifying trusted connections that do + * not require SASL negotiation + */ + public SaslDataTransferClient(SaslPropertiesResolver saslPropsResolver, + TrustedChannelResolver trustedChannelResolver, + boolean fallbackToSimpleAuthAllowed) { + this.fallbackToSimpleAuthAllowed = fallbackToSimpleAuthAllowed; + this.saslPropsResolver = saslPropsResolver; + this.trustedChannelResolver = trustedChannelResolver; + } + + /** + * Sends client SASL negotiation for a newly allocated socket if required. + * + * @param socket connection socket + * @param underlyingOut connection output stream + * @param underlyingIn connection input stream + * @param encryptionKeyFactory for creation of an encryption key + * @param accessToken connection block access token + * @param datanodeId ID of destination DataNode + * @return new pair of streams, wrapped after SASL negotiation + * @throws IOException for any error + */ + public IOStreamPair newSocketSend(Socket socket, OutputStream underlyingOut, + InputStream underlyingIn, DataEncryptionKeyFactory encryptionKeyFactory, + Token accessToken, DatanodeID datanodeId) + throws IOException { + // The encryption key factory only returns a key if encryption is enabled. + DataEncryptionKey encryptionKey = !trustedChannelResolver.isTrusted() ? + encryptionKeyFactory.newDataEncryptionKey() : null; + IOStreamPair ios = send(socket.getInetAddress(), underlyingOut, + underlyingIn, encryptionKey, accessToken, datanodeId); + return ios != null ? ios : new IOStreamPair(underlyingIn, underlyingOut); + } + + /** + * Sends client SASL negotiation for a peer if required. + * + * @param peer connection peer + * @param encryptionKeyFactory for creation of an encryption key + * @param accessToken connection block access token + * @param datanodeId ID of destination DataNode + * @return new pair of streams, wrapped after SASL negotiation + * @throws IOException for any error + */ + public Peer peerSend(Peer peer, DataEncryptionKeyFactory encryptionKeyFactory, + Token accessToken, DatanodeID datanodeId) + throws IOException { + IOStreamPair ios = checkTrustAndSend(getPeerAddress(peer), + peer.getOutputStream(), peer.getInputStream(), encryptionKeyFactory, + accessToken, datanodeId); + // TODO: Consider renaming EncryptedPeer to SaslPeer. + return ios != null ? new EncryptedPeer(peer, ios) : peer; + } + + /** + * Sends client SASL negotiation for a socket if required. + * + * @param socket connection socket + * @param underlyingOut connection output stream + * @param underlyingIn connection input stream + * @param encryptionKeyFactory for creation of an encryption key + * @param accessToken connection block access token + * @param datanodeId ID of destination DataNode + * @return new pair of streams, wrapped after SASL negotiation + * @throws IOException for any error + */ + public IOStreamPair socketSend(Socket socket, OutputStream underlyingOut, + InputStream underlyingIn, DataEncryptionKeyFactory encryptionKeyFactory, + Token accessToken, DatanodeID datanodeId) + throws IOException { + IOStreamPair ios = checkTrustAndSend(socket.getInetAddress(), underlyingOut, + underlyingIn, encryptionKeyFactory, accessToken, datanodeId); + return ios != null ? ios : new IOStreamPair(underlyingIn, underlyingOut); + } + + /** + * Checks if an address is already trusted and then sends client SASL + * negotiation if required. + * + * @param addr connection address + * @param underlyingOut connection output stream + * @param underlyingIn connection input stream + * @param encryptionKeyFactory for creation of an encryption key + * @param accessToken connection block access token + * @param datanodeId ID of destination DataNode + * @return new pair of streams, wrapped after SASL negotiation + * @throws IOException for any error + */ + private IOStreamPair checkTrustAndSend(InetAddress addr, + OutputStream underlyingOut, InputStream underlyingIn, + DataEncryptionKeyFactory encryptionKeyFactory, + Token accessToken, DatanodeID datanodeId) + throws IOException { + if (!trustedChannelResolver.isTrusted() && + !trustedChannelResolver.isTrusted(addr)) { + // The encryption key factory only returns a key if encryption is enabled. + DataEncryptionKey encryptionKey = + encryptionKeyFactory.newDataEncryptionKey(); + return send(addr, underlyingOut, underlyingIn, encryptionKey, accessToken, + datanodeId); + } else { + LOG.debug( + "SASL client skipping handshake on trusted connection for addr = {}, " + + "datanodeId = {}", addr, datanodeId); + return null; + } + } + + /** + * Sends client SASL negotiation if required. Determines the correct type of + * SASL handshake based on configuration. + * + * @param addr connection address + * @param underlyingOut connection output stream + * @param underlyingIn connection input stream + * @param encryptionKey for an encrypted SASL handshake + * @param accessToken connection block access token + * @param datanodeId ID of destination DataNode + * @return new pair of streams, wrapped after SASL negotiation + * @throws IOException for any error + */ + private IOStreamPair send(InetAddress addr, OutputStream underlyingOut, + InputStream underlyingIn, DataEncryptionKey encryptionKey, + Token accessToken, DatanodeID datanodeId) + throws IOException { + if (encryptionKey != null) { + LOG.debug( + "SASL client doing encrypted handshake for addr = {}, datanodeId = {}", + addr, datanodeId); + return getEncryptedStreams(underlyingOut, underlyingIn, + encryptionKey); + } else if (!UserGroupInformation.isSecurityEnabled()) { + LOG.debug( + "SASL client skipping handshake in unsecured configuration for " + + "addr = {}, datanodeId = {}", addr, datanodeId); + return null; + } else if (datanodeId.getXferPort() < 1024) { + LOG.debug( + "SASL client skipping handshake in secured configuration with " + + "privileged port for addr = {}, datanodeId = {}", addr, datanodeId); + return null; + } else if (accessToken.getIdentifier().length == 0) { + if (!fallbackToSimpleAuthAllowed) { + throw new IOException( + "No block access token was provided (insecure cluster), but this " + + "client is configured to allow only secure connections."); + } + LOG.debug( + "SASL client skipping handshake in secured configuration with " + + "unsecured cluster for addr = {}, datanodeId = {}", addr, datanodeId); + return null; + } else { + LOG.debug( + "SASL client doing general handshake for addr = {}, datanodeId = {}", + addr, datanodeId); + return getSaslStreams(addr, underlyingOut, underlyingIn, accessToken, + datanodeId); + } + } + + /** + * Sends client SASL negotiation for specialized encrypted handshake. + * + * @param underlyingOut connection output stream + * @param underlyingIn connection input stream + * @param encryptionKey for an encrypted SASL handshake + * @return new pair of streams, wrapped after SASL negotiation + * @throws IOException for any error + */ + private IOStreamPair getEncryptedStreams(OutputStream underlyingOut, + InputStream underlyingIn, DataEncryptionKey encryptionKey) + throws IOException { + Map saslProps = createSaslPropertiesForEncryption( + encryptionKey.encryptionAlgorithm); + + LOG.debug("Client using encryption algorithm {}", + encryptionKey.encryptionAlgorithm); + + String userName = getUserNameFromEncryptionKey(encryptionKey); + char[] password = encryptionKeyToPassword(encryptionKey.encryptionKey); + CallbackHandler callbackHandler = new SaslClientCallbackHandler(userName, + password); + return doSaslHandshake(underlyingOut, underlyingIn, userName, saslProps, + callbackHandler); + } + + /** + * The SASL username for an encrypted handshake consists of the keyId, + * blockPoolId, and nonce with the first two encoded as Strings, and the third + * encoded using Base64. The fields are each separated by a single space. + * + * @param encryptionKey the encryption key to encode as a SASL username. + * @return encoded username containing keyId, blockPoolId, and nonce + */ + private static String getUserNameFromEncryptionKey( + DataEncryptionKey encryptionKey) { + return encryptionKey.keyId + NAME_DELIMITER + + encryptionKey.blockPoolId + NAME_DELIMITER + + new String(Base64.encodeBase64(encryptionKey.nonce, false), Charsets.UTF_8); + } + + /** + * Sets user name and password when asked by the client-side SASL object. + */ + private static final class SaslClientCallbackHandler + implements CallbackHandler { + + private final char[] password; + private final String userName; + + /** + * Creates a new SaslClientCallbackHandler. + * + * @param userName SASL user name + * @Param password SASL password + */ + public SaslClientCallbackHandler(String userName, char[] password) { + this.password = password; + this.userName = userName; + } + + @Override + public void handle(Callback[] callbacks) throws IOException, + UnsupportedCallbackException { + NameCallback nc = null; + PasswordCallback pc = null; + RealmCallback rc = null; + for (Callback callback : callbacks) { + if (callback instanceof RealmChoiceCallback) { + continue; + } else if (callback instanceof NameCallback) { + nc = (NameCallback) callback; + } else if (callback instanceof PasswordCallback) { + pc = (PasswordCallback) callback; + } else if (callback instanceof RealmCallback) { + rc = (RealmCallback) callback; + } else { + throw new UnsupportedCallbackException(callback, + "Unrecognized SASL client callback"); + } + } + if (nc != null) { + nc.setName(userName); + } + if (pc != null) { + pc.setPassword(password); + } + if (rc != null) { + rc.setText(rc.getDefaultText()); + } + } + } + + /** + * Sends client SASL negotiation for general-purpose handshake. + * + * @param addr connection address + * @param underlyingOut connection output stream + * @param underlyingIn connection input stream + * @param accessToken connection block access token + * @param datanodeId ID of destination DataNode + * @return new pair of streams, wrapped after SASL negotiation + * @throws IOException for any error + */ + private IOStreamPair getSaslStreams(InetAddress addr, + OutputStream underlyingOut, InputStream underlyingIn, + Token accessToken, DatanodeID datanodeId) + throws IOException { + if (saslPropsResolver == null) { + throw new IOException(String.format("Cannot create a secured " + + "connection if DataNode listens on unprivileged port (%d) and no " + + "protection is defined in configuration property %s.", + datanodeId.getXferPort(), DFS_DATA_TRANSFER_PROTECTION_KEY)); + } + Map saslProps = saslPropsResolver.getClientProperties(addr); + + String userName = buildUserName(accessToken); + char[] password = buildClientPassword(accessToken); + CallbackHandler callbackHandler = new SaslClientCallbackHandler(userName, + password); + return doSaslHandshake(underlyingOut, underlyingIn, userName, saslProps, + callbackHandler); + } + + /** + * Builds the client's user name for the general-purpose handshake, consisting + * of the base64-encoded serialized block access token identifier. Note that + * this includes only the token identifier, not the token itself, which would + * include the password. The password is a shared secret, and we must not + * write it on the network during the SASL authentication exchange. + * + * @param blockToken for block access + * @return SASL user name + */ + private static String buildUserName(Token blockToken) { + return new String(Base64.encodeBase64(blockToken.getIdentifier(), false), + Charsets.UTF_8); + } + + /** + * Calculates the password on the client side for the general-purpose + * handshake. The password consists of the block access token's password. + * + * @param blockToken for block access + * @return SASL password + */ + private char[] buildClientPassword(Token blockToken) { + return new String(Base64.encodeBase64(blockToken.getPassword(), false), + Charsets.UTF_8).toCharArray(); + } + + /** + * This method actually executes the client-side SASL handshake. + * + * @param underlyingOut connection output stream + * @param underlyingIn connection input stream + * @param userName SASL user name + * @param saslProps properties of SASL negotiation + * @param callbackHandler for responding to SASL callbacks + * @return new pair of streams, wrapped after SASL negotiation + * @throws IOException for any error + */ + private IOStreamPair doSaslHandshake(OutputStream underlyingOut, + InputStream underlyingIn, String userName, Map saslProps, + CallbackHandler callbackHandler) throws IOException { + + DataOutputStream out = new DataOutputStream(underlyingOut); + DataInputStream in = new DataInputStream(underlyingIn); + + SaslParticipant sasl= SaslParticipant.createClientSaslParticipant(userName, + saslProps, callbackHandler); + + out.writeInt(SASL_TRANSFER_MAGIC_NUMBER); + out.flush(); + + try { + // Start of handshake - "initial response" in SASL terminology. + sendSaslMessage(out, new byte[0]); + + // step 1 + performSaslStep1(out, in, sasl); + + // step 2 (client-side only) + byte[] remoteResponse = readSaslMessage(in); + byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse); + assert localResponse == null; + + // SASL handshake is complete + checkSaslComplete(sasl, saslProps); + + return sasl.createStreamPair(out, in); + } catch (IOException ioe) { + sendGenericSaslErrorMessage(out, ioe.getMessage()); + throw ioe; + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java new file mode 100644 index 00000000000..78570579323 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java @@ -0,0 +1,381 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol.datatransfer.sasl; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY; +import static org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil.*; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Map; + +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.CallbackHandler; +import javax.security.auth.callback.NameCallback; +import javax.security.auth.callback.PasswordCallback; +import javax.security.auth.callback.UnsupportedCallbackException; +import javax.security.sasl.AuthorizeCallback; +import javax.security.sasl.RealmCallback; +import javax.security.sasl.SaslException; + +import org.apache.commons.codec.binary.Base64; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.net.Peer; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; +import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus; +import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.datanode.DNConf; +import org.apache.hadoop.security.SaslPropertiesResolver; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Charsets; + +/** + * Negotiates SASL for DataTransferProtocol on behalf of a server. There are + * two possible supported variants of SASL negotiation: either a general-purpose + * negotiation supporting any quality of protection, or a specialized + * negotiation that enforces privacy as the quality of protection using a + * cryptographically strong encryption key. + * + * This class is used in the DataNode for handling inbound connections. + */ +@InterfaceAudience.Private +public class SaslDataTransferServer { + + private static final Logger LOG = LoggerFactory.getLogger( + SaslDataTransferServer.class); + + private final BlockPoolTokenSecretManager blockPoolTokenSecretManager; + private final DNConf dnConf; + + /** + * Creates a new SaslDataTransferServer. + * + * @param dnConf configuration of DataNode + * @param blockPoolTokenSecretManager used for checking block access tokens + * and encryption keys + */ + public SaslDataTransferServer(DNConf dnConf, + BlockPoolTokenSecretManager blockPoolTokenSecretManager) { + this.blockPoolTokenSecretManager = blockPoolTokenSecretManager; + this.dnConf = dnConf; + } + + /** + * Receives SASL negotiation from a peer on behalf of a server. + * + * @param peer connection peer + * @param underlyingOut connection output stream + * @param underlyingIn connection input stream + * @param datanodeId ID of DataNode accepting connection + * @return new pair of streams, wrapped after SASL negotiation + * @throws IOException for any error + */ + public IOStreamPair receive(Peer peer, OutputStream underlyingOut, + InputStream underlyingIn, DatanodeID datanodeId) throws IOException { + if (dnConf.getEncryptDataTransfer()) { + LOG.debug( + "SASL server doing encrypted handshake for peer = {}, datanodeId = {}", + peer, datanodeId); + return getEncryptedStreams(peer, underlyingOut, underlyingIn); + } else if (!UserGroupInformation.isSecurityEnabled()) { + LOG.debug( + "SASL server skipping handshake in unsecured configuration for " + + "peer = {}, datanodeId = {}", peer, datanodeId); + return new IOStreamPair(underlyingIn, underlyingOut); + } else if (datanodeId.getXferPort() < 1024) { + LOG.debug( + "SASL server skipping handshake in unsecured configuration for " + + "peer = {}, datanodeId = {}", peer, datanodeId); + return new IOStreamPair(underlyingIn, underlyingOut); + } else { + LOG.debug( + "SASL server doing general handshake for peer = {}, datanodeId = {}", + peer, datanodeId); + return getSaslStreams(peer, underlyingOut, underlyingIn, datanodeId); + } + } + + /** + * Receives SASL negotiation for specialized encrypted handshake. + * + * @param peer connection peer + * @param underlyingOut connection output stream + * @param underlyingIn connection input stream + * @return new pair of streams, wrapped after SASL negotiation + * @throws IOException for any error + */ + private IOStreamPair getEncryptedStreams(Peer peer, + OutputStream underlyingOut, InputStream underlyingIn) throws IOException { + if (peer.hasSecureChannel() || + dnConf.getTrustedChannelResolver().isTrusted(getPeerAddress(peer))) { + return new IOStreamPair(underlyingIn, underlyingOut); + } + + Map saslProps = createSaslPropertiesForEncryption( + dnConf.getEncryptionAlgorithm()); + + if (LOG.isDebugEnabled()) { + LOG.debug("Server using encryption algorithm " + + dnConf.getEncryptionAlgorithm()); + } + + CallbackHandler callbackHandler = new SaslServerCallbackHandler( + new PasswordFunction() { + @Override + public char[] apply(String userName) throws IOException { + return encryptionKeyToPassword(getEncryptionKeyFromUserName(userName)); + } + }); + return doSaslHandshake(underlyingOut, underlyingIn, saslProps, + callbackHandler); + } + + /** + * The SASL handshake for encrypted vs. general-purpose uses different logic + * for determining the password. This interface is used to parameterize that + * logic. It's similar to a Guava Function, but we need to let it throw + * exceptions. + */ + private interface PasswordFunction { + + /** + * Returns the SASL password for the given user name. + * + * @param userName SASL user name + * @return SASL password + * @throws IOException for any error + */ + char[] apply(String userName) throws IOException; + } + + /** + * Sets user name and password when asked by the server-side SASL object. + */ + private static final class SaslServerCallbackHandler + implements CallbackHandler { + + private final PasswordFunction passwordFunction; + + /** + * Creates a new SaslServerCallbackHandler. + * + * @param passwordFunction for determing the user's password + */ + public SaslServerCallbackHandler(PasswordFunction passwordFunction) { + this.passwordFunction = passwordFunction; + } + + @Override + public void handle(Callback[] callbacks) throws IOException, + UnsupportedCallbackException { + NameCallback nc = null; + PasswordCallback pc = null; + AuthorizeCallback ac = null; + for (Callback callback : callbacks) { + if (callback instanceof AuthorizeCallback) { + ac = (AuthorizeCallback) callback; + } else if (callback instanceof PasswordCallback) { + pc = (PasswordCallback) callback; + } else if (callback instanceof NameCallback) { + nc = (NameCallback) callback; + } else if (callback instanceof RealmCallback) { + continue; // realm is ignored + } else { + throw new UnsupportedCallbackException(callback, + "Unrecognized SASL DIGEST-MD5 Callback: " + callback); + } + } + + if (pc != null) { + pc.setPassword(passwordFunction.apply(nc.getDefaultName())); + } + + if (ac != null) { + ac.setAuthorized(true); + ac.setAuthorizedID(ac.getAuthorizationID()); + } + } + } + + /** + * Given a secret manager and a username encoded for the encrypted handshake, + * determine the encryption key. + * + * @param userName containing the keyId, blockPoolId, and nonce. + * @return secret encryption key. + * @throws IOException + */ + private byte[] getEncryptionKeyFromUserName(String userName) + throws IOException { + String[] nameComponents = userName.split(NAME_DELIMITER); + if (nameComponents.length != 3) { + throw new IOException("Provided name '" + userName + "' has " + + nameComponents.length + " components instead of the expected 3."); + } + int keyId = Integer.parseInt(nameComponents[0]); + String blockPoolId = nameComponents[1]; + byte[] nonce = Base64.decodeBase64(nameComponents[2]); + return blockPoolTokenSecretManager.retrieveDataEncryptionKey(keyId, + blockPoolId, nonce); + } + + /** + * Receives SASL negotiation for general-purpose handshake. + * + * @param peer connection peer + * @param underlyingOut connection output stream + * @param underlyingIn connection input stream + * @param datanodeId ID of DataNode accepting connection + * @return new pair of streams, wrapped after SASL negotiation + * @throws IOException for any error + */ + private IOStreamPair getSaslStreams(Peer peer, OutputStream underlyingOut, + InputStream underlyingIn, final DatanodeID datanodeId) throws IOException { + SaslPropertiesResolver saslPropsResolver = dnConf.getSaslPropsResolver(); + if (saslPropsResolver == null) { + throw new IOException(String.format("Cannot create a secured " + + "connection if DataNode listens on unprivileged port (%d) and no " + + "protection is defined in configuration property %s.", + datanodeId.getXferPort(), DFS_DATA_TRANSFER_PROTECTION_KEY)); + } + Map saslProps = saslPropsResolver.getServerProperties( + getPeerAddress(peer)); + + CallbackHandler callbackHandler = new SaslServerCallbackHandler( + new PasswordFunction() { + @Override + public char[] apply(String userName) throws IOException { + return buildServerPassword(userName); + } + }); + return doSaslHandshake(underlyingOut, underlyingIn, saslProps, + callbackHandler); + } + + /** + * Calculates the expected correct password on the server side for the + * general-purpose handshake. The password consists of the block access + * token's password (known to the DataNode via its secret manager). This + * expects that the client has supplied a user name consisting of its + * serialized block access token identifier. + * + * @param userName SASL user name containing serialized block access token + * identifier + * @return expected correct SASL password + * @throws IOException for any error + */ + private char[] buildServerPassword(String userName) throws IOException { + BlockTokenIdentifier identifier = deserializeIdentifier(userName); + byte[] tokenPassword = blockPoolTokenSecretManager.retrievePassword( + identifier); + return (new String(Base64.encodeBase64(tokenPassword, false), + Charsets.UTF_8)).toCharArray(); + } + + /** + * Deserializes a base64-encoded binary representation of a block access + * token. + * + * @param str String to deserialize + * @return BlockTokenIdentifier deserialized from str + * @throws IOException if there is any I/O error + */ + private BlockTokenIdentifier deserializeIdentifier(String str) + throws IOException { + BlockTokenIdentifier identifier = new BlockTokenIdentifier(); + identifier.readFields(new DataInputStream(new ByteArrayInputStream( + Base64.decodeBase64(str)))); + return identifier; + } + + /** + * This method actually executes the server-side SASL handshake. + * + * @param underlyingOut connection output stream + * @param underlyingIn connection input stream + * @param saslProps properties of SASL negotiation + * @param callbackHandler for responding to SASL callbacks + * @return new pair of streams, wrapped after SASL negotiation + * @throws IOException for any error + */ + private IOStreamPair doSaslHandshake(OutputStream underlyingOut, + InputStream underlyingIn, Map saslProps, + CallbackHandler callbackHandler) throws IOException { + + DataInputStream in = new DataInputStream(underlyingIn); + DataOutputStream out = new DataOutputStream(underlyingOut); + + SaslParticipant sasl = SaslParticipant.createServerSaslParticipant(saslProps, + callbackHandler); + + int magicNumber = in.readInt(); + if (magicNumber != SASL_TRANSFER_MAGIC_NUMBER) { + throw new InvalidMagicNumberException(magicNumber); + } + try { + // step 1 + performSaslStep1(out, in, sasl); + + // step 2 (server-side only) + byte[] remoteResponse = readSaslMessage(in); + byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse); + sendSaslMessage(out, localResponse); + + // SASL handshake is complete + checkSaslComplete(sasl, saslProps); + + return sasl.createStreamPair(out, in); + } catch (IOException ioe) { + if (ioe instanceof SaslException && + ioe.getCause() != null && + ioe.getCause() instanceof InvalidEncryptionKeyException) { + // This could just be because the client is long-lived and hasn't gotten + // a new encryption key from the NN in a while. Upon receiving this + // error, the client will get a new encryption key from the NN and retry + // connecting to this DN. + sendInvalidKeySaslErrorMessage(out, ioe.getCause().getMessage()); + } else { + sendGenericSaslErrorMessage(out, ioe.getMessage()); + } + throw ioe; + } + } + + /** + * Sends a SASL negotiation message indicating an invalid key error. + * + * @param out stream to receive message + * @param message to send + * @throws IOException for any error + */ + private static void sendInvalidKeySaslErrorMessage(DataOutputStream out, + String message) throws IOException { + sendSaslMessage(out, DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY, null, + message); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslParticipant.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslParticipant.java new file mode 100644 index 00000000000..106e297d5c8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslParticipant.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol.datatransfer.sasl; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.util.Map; +import javax.security.auth.callback.CallbackHandler; +import javax.security.sasl.Sasl; +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslException; +import javax.security.sasl.SaslServer; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; +import org.apache.hadoop.security.SaslInputStream; +import org.apache.hadoop.security.SaslOutputStream; + +/** + * Strongly inspired by Thrift's TSaslTransport class. + * + * Used to abstract over the SaslServer and + * SaslClient classes, which share a lot of their interface, but + * unfortunately don't share a common superclass. + */ +@InterfaceAudience.Private +class SaslParticipant { + + // This has to be set as part of the SASL spec, but it don't matter for + // our purposes, but may not be empty. It's sent over the wire, so use + // a short string. + private static final String SERVER_NAME = "0"; + private static final String PROTOCOL = "hdfs"; + private static final String MECHANISM = "DIGEST-MD5"; + + // One of these will always be null. + private final SaslServer saslServer; + private final SaslClient saslClient; + + /** + * Creates a SaslParticipant wrapping a SaslServer. + * + * @param saslProps properties of SASL negotiation + * @param callbackHandler for handling all SASL callbacks + * @return SaslParticipant wrapping SaslServer + * @throws SaslException for any error + */ + public static SaslParticipant createServerSaslParticipant( + Map saslProps, CallbackHandler callbackHandler) + throws SaslException { + return new SaslParticipant(Sasl.createSaslServer(MECHANISM, + PROTOCOL, SERVER_NAME, saslProps, callbackHandler)); + } + + /** + * Creates a SaslParticipant wrapping a SaslClient. + * + * @param userName SASL user name + * @param saslProps properties of SASL negotiation + * @param callbackHandler for handling all SASL callbacks + * @return SaslParticipant wrapping SaslClient + * @throws SaslException for any error + */ + public static SaslParticipant createClientSaslParticipant(String userName, + Map saslProps, CallbackHandler callbackHandler) + throws SaslException { + return new SaslParticipant(Sasl.createSaslClient(new String[] { MECHANISM }, + userName, PROTOCOL, SERVER_NAME, saslProps, callbackHandler)); + } + + /** + * Private constructor wrapping a SaslServer. + * + * @param saslServer to wrap + */ + private SaslParticipant(SaslServer saslServer) { + this.saslServer = saslServer; + this.saslClient = null; + } + + /** + * Private constructor wrapping a SaslClient. + * + * @param saslClient to wrap + */ + private SaslParticipant(SaslClient saslClient) { + this.saslServer = null; + this.saslClient = saslClient; + } + + /** + * @see {@link SaslServer#evaluateResponse} + * @see {@link SaslClient#evaluateChallenge} + */ + public byte[] evaluateChallengeOrResponse(byte[] challengeOrResponse) + throws SaslException { + if (saslClient != null) { + return saslClient.evaluateChallenge(challengeOrResponse); + } else { + return saslServer.evaluateResponse(challengeOrResponse); + } + } + + /** + * After successful SASL negotation, returns the negotiated quality of + * protection. + * + * @return negotiated quality of protection + */ + public String getNegotiatedQop() { + if (saslClient != null) { + return (String) saslClient.getNegotiatedProperty(Sasl.QOP); + } else { + return (String) saslServer.getNegotiatedProperty(Sasl.QOP); + } + } + + /** + * Returns true if SASL negotiation is complete. + * + * @return true if SASL negotiation is complete + */ + public boolean isComplete() { + if (saslClient != null) { + return saslClient.isComplete(); + } else { + return saslServer.isComplete(); + } + } + + /** + * Return some input/output streams that may henceforth have their + * communication encrypted, depending on the negotiated quality of protection. + * + * @param out output stream to wrap + * @param in input stream to wrap + * @return IOStreamPair wrapping the streams + */ + public IOStreamPair createStreamPair(DataOutputStream out, + DataInputStream in) { + if (saslClient != null) { + return new IOStreamPair( + new SaslInputStream(in, saslClient), + new SaslOutputStream(out, saslClient)); + } else { + return new IOStreamPair( + new SaslInputStream(in, saslServer), + new SaslOutputStream(out, saslServer)); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java index 1a590a912c5..93e240c3e63 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hdfs.server.balancer; import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT; +import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY; import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed; import java.io.BufferedInputStream; @@ -62,9 +64,11 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; -import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor; import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; +import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; @@ -202,6 +206,7 @@ public class Balancer { private final NameNodeConnector nnc; private final BalancingPolicy policy; + private final SaslDataTransferClient saslClient; private final double threshold; // all data node lists @@ -352,19 +357,18 @@ private void dispatch() { OutputStream unbufOut = sock.getOutputStream(); InputStream unbufIn = sock.getInputStream(); - if (nnc.getDataEncryptionKey() != null) { - IOStreamPair encryptedStreams = - DataTransferEncryptor.getEncryptedStreams( - unbufOut, unbufIn, nnc.getDataEncryptionKey()); - unbufOut = encryptedStreams.out; - unbufIn = encryptedStreams.in; - } + ExtendedBlock eb = new ExtendedBlock(nnc.blockpoolID, block.getBlock()); + Token accessToken = nnc.getAccessToken(eb); + IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut, + unbufIn, nnc, accessToken, target.datanode); + unbufOut = saslStreams.out; + unbufIn = saslStreams.in; out = new DataOutputStream(new BufferedOutputStream(unbufOut, HdfsConstants.IO_FILE_BUFFER_SIZE)); in = new DataInputStream(new BufferedInputStream(unbufIn, HdfsConstants.IO_FILE_BUFFER_SIZE)); - sendRequest(out); + sendRequest(out, eb, accessToken); receiveResponse(in); bytesMoved.addAndGet(block.getNumBytes()); LOG.info("Successfully moved " + this); @@ -395,9 +399,8 @@ private void dispatch() { } /* Send a block replace request to the output stream*/ - private void sendRequest(DataOutputStream out) throws IOException { - final ExtendedBlock eb = new ExtendedBlock(nnc.blockpoolID, block.getBlock()); - final Token accessToken = nnc.getAccessToken(eb); + private void sendRequest(DataOutputStream out, ExtendedBlock eb, + Token accessToken) throws IOException { new Sender(out).replaceBlock(eb, accessToken, source.getStorageID(), proxySource.getDatanode()); } @@ -876,6 +879,12 @@ private static void checkReplicationPolicyCompatibility(Configuration conf this.maxConcurrentMovesPerNode = conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT); + this.saslClient = new SaslDataTransferClient( + DataTransferSaslUtil.getSaslPropertiesResolver(conf), + TrustedChannelResolver.getInstance(conf), + conf.getBoolean( + IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, + IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT)); } /* Given a data node set, build a network topology and decide diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java index 25784a26cfa..fb322763d76 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java @@ -34,8 +34,8 @@ import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory; import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; -import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; @@ -50,7 +50,7 @@ * The class provides utilities for {@link Balancer} to access a NameNode */ @InterfaceAudience.Private -class NameNodeConnector { +class NameNodeConnector implements DataEncryptionKeyFactory { private static final Log LOG = Balancer.LOG; private static final Path BALANCER_ID_PATH = new Path("/system/balancer.id"); private static final int MAX_NOT_CHANGED_ITERATIONS = 5; @@ -72,7 +72,6 @@ class NameNodeConnector { private BlockTokenSecretManager blockTokenSecretManager; private Daemon keyupdaterthread; // AccessKeyUpdater thread private DataEncryptionKey encryptionKey; - private final TrustedChannelResolver trustedChannelResolver; NameNodeConnector(URI nameNodeUri, Configuration conf) throws IOException { @@ -122,7 +121,6 @@ class NameNodeConnector { if (out == null) { throw new IOException("Another balancer is running"); } - this.trustedChannelResolver = TrustedChannelResolver.getInstance(conf); } boolean shouldContinue(long dispatchBlockMoveBytes) { @@ -154,10 +152,10 @@ Token getAccessToken(ExtendedBlock eb BlockTokenSecretManager.AccessMode.COPY)); } } - - DataEncryptionKey getDataEncryptionKey() - throws IOException { - if (encryptDataTransfer && !this.trustedChannelResolver.isTrusted()) { + + @Override + public DataEncryptionKey newDataEncryptionKey() { + if (encryptDataTransfer) { synchronized (this) { if (encryptionKey == null) { encryptionKey = blockTokenSecretManager.generateDataEncryptionKey(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java index 6c0d23f3e49..acdda66b535 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java @@ -58,8 +58,8 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; -import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; @@ -211,14 +211,16 @@ private static DatanodeInfo bestNode(DatanodeInfo[] nodes, boolean doRandom) } public static void streamBlockInAscii(InetSocketAddress addr, String poolId, - long blockId, Token blockToken, long genStamp, + long blockId, final Token blockToken, long genStamp, long blockSize, long offsetIntoBlock, long chunkSizeToView, JspWriter out, final Configuration conf, DFSClient.Conf dfsConf, - final DataEncryptionKey encryptionKey) + final DFSClient dfs, final SaslDataTransferClient saslClient) throws IOException { if (chunkSizeToView == 0) return; int amtToRead = (int)Math.min(chunkSizeToView, blockSize - offsetIntoBlock); + DatanodeID datanodeId = new DatanodeID(addr.getAddress().getHostAddress(), + addr.getHostName(), poolId, addr.getPort(), 0, 0, 0); BlockReader blockReader = new BlockReaderFactory(dfsConf). setInetSocketAddress(addr). setBlock(new ExtendedBlock(poolId, blockId, 0, genStamp)). @@ -229,21 +231,21 @@ public static void streamBlockInAscii(InetSocketAddress addr, String poolId, setVerifyChecksum(true). setClientName("JspHelper"). setClientCacheContext(ClientContext.getFromConf(conf)). - setDatanodeInfo(new DatanodeInfo( - new DatanodeID(addr.getAddress().getHostAddress(), - addr.getHostName(), poolId, addr.getPort(), 0, 0, 0))). + setDatanodeInfo(new DatanodeInfo(datanodeId)). setCachingStrategy(CachingStrategy.newDefaultStrategy()). setConfiguration(conf). setRemotePeerFactory(new RemotePeerFactory() { @Override - public Peer newConnectedPeer(InetSocketAddress addr) + public Peer newConnectedPeer(InetSocketAddress addr, + Token blockToken, DatanodeID datanodeId) throws IOException { Peer peer = null; Socket sock = NetUtils.getDefaultSocketFactory(conf).createSocket(); try { sock.connect(addr, HdfsServerConstants.READ_TIMEOUT); sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT); - peer = TcpPeerServer.peerFromSocketAndKey(sock, encryptionKey); + peer = TcpPeerServer.peerFromSocketAndKey(saslClient, sock, dfs, + blockToken, datanodeId); } finally { if (peer == null) { IOUtils.closeSocket(sock); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java index 4c1d39da7ba..4a36472cb00 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java @@ -52,7 +52,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; +import org.apache.hadoop.security.SaslPropertiesResolver; /** * Simple class encapsulating all of the configuration that the DataNode @@ -86,6 +88,7 @@ public class DNConf { final String minimumNameNodeVersion; final String encryptionAlgorithm; + final SaslPropertiesResolver saslPropsResolver; final TrustedChannelResolver trustedChannelResolver; final long xceiverStopTimeout; @@ -168,6 +171,8 @@ public DNConf(Configuration conf) { DFS_ENCRYPT_DATA_TRANSFER_DEFAULT); this.encryptionAlgorithm = conf.get(DFS_DATA_ENCRYPTION_ALGORITHM_KEY); this.trustedChannelResolver = TrustedChannelResolver.getInstance(conf); + this.saslPropsResolver = DataTransferSaslUtil.getSaslPropertiesResolver( + conf); this.xceiverStopTimeout = conf.getLong( DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY, @@ -186,7 +191,26 @@ public DNConf(Configuration conf) { String getMinimumNameNodeVersion() { return this.minimumNameNodeVersion; } - + + /** + * Returns true if encryption enabled for DataTransferProtocol. + * + * @return boolean true if encryption enabled for DataTransferProtocol + */ + public boolean getEncryptDataTransfer() { + return encryptDataTransfer; + } + + /** + * Returns encryption algorithm configured for DataTransferProtocol, or null + * if not configured. + * + * @return encryption algorithm configured for DataTransferProtocol + */ + public String getEncryptionAlgorithm() { + return encryptionAlgorithm; + } + public long getXceiverStopTimeout() { return xceiverStopTimeout; } @@ -194,4 +218,24 @@ public long getXceiverStopTimeout() { public long getMaxLockedMemory() { return maxLockedMemory; } + + /** + * Returns the SaslPropertiesResolver configured for use with + * DataTransferProtocol, or null if not configured. + * + * @return SaslPropertiesResolver configured for use with DataTransferProtocol + */ + public SaslPropertiesResolver getSaslPropsResolver() { + return saslPropsResolver; + } + + /** + * Returns the TrustedChannelResolver configured for use with + * DataTransferProtocol, or null if not configured. + * + * @return TrustedChannelResolver configured for use with DataTransferProtocol + */ + public TrustedChannelResolver getTrustedChannelResolver() { + return trustedChannelResolver; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index db3ff9c0d7d..8850f116a38 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hdfs.server.datanode; +import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT; +import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY; + import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; @@ -41,6 +44,9 @@ import org.apache.hadoop.hdfs.net.TcpPeerServer; import org.apache.hadoop.hdfs.protocol.*; import org.apache.hadoop.hdfs.protocol.datatransfer.*; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferServer; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ClientDatanodeProtocolService; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DNTransferAckProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; @@ -227,6 +233,8 @@ public static InetSocketAddress createSocketAddr(String target) { private final List usersWithLocalPathAccess; private final boolean connectToDnViaHostname; ReadaheadPool readaheadPool; + SaslDataTransferClient saslClient; + SaslDataTransferServer saslServer; private final boolean getHdfsBlockLocationsEnabled; private ObjectName dataNodeInfoBeanName; private Thread checkDiskErrorThread = null; @@ -729,15 +737,10 @@ boolean areHeartbeatsDisabledForTests() { */ void startDataNode(Configuration conf, List dataDirs, - // DatanodeProtocol namenode, SecureResources resources ) throws IOException { - if(UserGroupInformation.isSecurityEnabled() && resources == null) { - if (!conf.getBoolean("ignore.secure.ports.for.testing", false)) { - throw new RuntimeException("Cannot start secure cluster without " - + "privileged resources."); - } - } + + checkSecureConfig(conf, resources); // settings global for all BPs in the Data Node this.secureResources = resources; @@ -797,6 +800,55 @@ void startDataNode(Configuration conf, // Create the ReadaheadPool from the DataNode context so we can // exit without having to explicitly shutdown its thread pool. readaheadPool = ReadaheadPool.getInstance(); + saslClient = new SaslDataTransferClient(dnConf.saslPropsResolver, + dnConf.trustedChannelResolver, + conf.getBoolean( + IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, + IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT)); + saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager); + } + + /** + * Checks if the DataNode has a secure configuration if security is enabled. + * There are 2 possible configurations that are considered secure: + * 1. The server has bound to privileged ports for RPC and HTTP via + * SecureDataNodeStarter. + * 2. The configuration enables SASL on DataTransferProtocol and HTTPS (no + * plain HTTP) for the HTTP server. The SASL handshake guarantees + * authentication of the RPC server before a client transmits a secret, such + * as a block access token. Similarly, SSL guarantees authentication of the + * HTTP server before a client transmits a secret, such as a delegation + * token. + * It is not possible to run with both privileged ports and SASL on + * DataTransferProtocol. For backwards-compatibility, the connection logic + * must check if the target port is a privileged port, and if so, skip the + * SASL handshake. + * + * @param conf Configuration to check + * @param resources SecuredResources obtained for DataNode + * @throws RuntimeException if security enabled, but configuration is insecure + */ + private static void checkSecureConfig(Configuration conf, + SecureResources resources) throws RuntimeException { + if (!UserGroupInformation.isSecurityEnabled()) { + return; + } + String dataTransferProtection = conf.get(DFS_DATA_TRANSFER_PROTECTION_KEY); + if (resources != null && dataTransferProtection == null) { + return; + } + if (conf.getBoolean("ignore.secure.ports.for.testing", false)) { + return; + } + if (dataTransferProtection != null && + DFSUtil.getHttpPolicy(conf) == HttpConfig.Policy.HTTPS_ONLY && + resources == null) { + return; + } + throw new RuntimeException("Cannot start secure DataNode without " + + "configuring either privileged resources or SASL RPC data transfer " + + "protection and SSL for HTTP. Using privileged resources in " + + "combination with SASL RPC data transfer protection is not supported."); } public static String generateUuid() { @@ -1630,28 +1682,6 @@ public void run() { NetUtils.connect(sock, curTarget, dnConf.socketTimeout); sock.setSoTimeout(targets.length * dnConf.socketTimeout); - long writeTimeout = dnConf.socketWriteTimeout + - HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1); - OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout); - InputStream unbufIn = NetUtils.getInputStream(sock); - if (dnConf.encryptDataTransfer && - !dnConf.trustedChannelResolver.isTrusted(sock.getInetAddress())) { - IOStreamPair encryptedStreams = - DataTransferEncryptor.getEncryptedStreams( - unbufOut, unbufIn, - blockPoolTokenSecretManager.generateDataEncryptionKey( - b.getBlockPoolId())); - unbufOut = encryptedStreams.out; - unbufIn = encryptedStreams.in; - } - - out = new DataOutputStream(new BufferedOutputStream(unbufOut, - HdfsConstants.SMALL_BUFFER_SIZE)); - in = new DataInputStream(unbufIn); - blockSender = new BlockSender(b, 0, b.getNumBytes(), - false, false, true, DataNode.this, null, cachingStrategy); - DatanodeInfo srcNode = new DatanodeInfo(bpReg); - // // Header info // @@ -1661,6 +1691,24 @@ public void run() { EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)); } + long writeTimeout = dnConf.socketWriteTimeout + + HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1); + OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout); + InputStream unbufIn = NetUtils.getInputStream(sock); + DataEncryptionKeyFactory keyFactory = + getDataEncryptionKeyFactoryForBlock(b); + IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut, + unbufIn, keyFactory, accessToken, bpReg); + unbufOut = saslStreams.out; + unbufIn = saslStreams.in; + + out = new DataOutputStream(new BufferedOutputStream(unbufOut, + HdfsConstants.SMALL_BUFFER_SIZE)); + in = new DataInputStream(unbufIn); + blockSender = new BlockSender(b, 0, b.getNumBytes(), + false, false, true, DataNode.this, null, cachingStrategy); + DatanodeInfo srcNode = new DatanodeInfo(bpReg); + new Sender(out).writeBlock(b, accessToken, clientname, targets, srcNode, stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy); @@ -1703,7 +1751,26 @@ public void run() { } } } - + + /** + * Returns a new DataEncryptionKeyFactory that generates a key from the + * BlockPoolTokenSecretManager, using the block pool ID of the given block. + * + * @param block for which the factory needs to create a key + * @return DataEncryptionKeyFactory for block's block pool ID + */ + DataEncryptionKeyFactory getDataEncryptionKeyFactoryForBlock( + final ExtendedBlock block) { + return new DataEncryptionKeyFactory() { + @Override + public DataEncryptionKey newDataEncryptionKey() { + return dnConf.encryptDataTransfer ? + blockPoolTokenSecretManager.generateDataEncryptionKey( + block.getBlockPoolId()) : null; + } + }; + } + /** * After a block becomes finalized, a datanode increases metric counter, * notifies namenode, and adds it to the block scanner diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index a118fcb1ff3..7b730905156 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -36,11 +36,9 @@ import java.io.InputStream; import java.io.InterruptedIOException; import java.io.OutputStream; -import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketException; -import java.net.UnknownHostException; import java.nio.channels.ClosedChannelException; import java.security.MessageDigest; import java.util.Arrays; @@ -52,13 +50,12 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; -import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor; -import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor.InvalidMagicNumberException; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.hdfs.protocol.datatransfer.Op; import org.apache.hadoop.hdfs.protocol.datatransfer.Receiver; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto; @@ -85,7 +82,6 @@ import org.apache.hadoop.util.DataChecksum; import com.google.common.base.Preconditions; -import com.google.common.net.InetAddresses; import com.google.protobuf.ByteString; @@ -174,24 +170,11 @@ public void run() { dataXceiverServer.addPeer(peer, Thread.currentThread()); peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout); InputStream input = socketIn; - if ((!peer.hasSecureChannel()) && dnConf.encryptDataTransfer && - !dnConf.trustedChannelResolver.isTrusted(getClientAddress(peer))){ - IOStreamPair encryptedStreams = null; - try { - encryptedStreams = DataTransferEncryptor.getEncryptedStreams(socketOut, - socketIn, datanode.blockPoolTokenSecretManager, - dnConf.encryptionAlgorithm); - } catch (InvalidMagicNumberException imne) { - LOG.info("Failed to read expected encryption handshake from client " + - "at " + peer.getRemoteAddressString() + ". Perhaps the client " + - "is running an older version of Hadoop which does not support " + - "encryption"); - return; - } - input = encryptedStreams.in; - socketOut = encryptedStreams.out; - } - input = new BufferedInputStream(input, HdfsConstants.SMALL_BUFFER_SIZE); + IOStreamPair saslStreams = datanode.saslServer.receive(peer, socketOut, + socketIn, datanode.getDatanodeId()); + input = new BufferedInputStream(saslStreams.in, + HdfsConstants.SMALL_BUFFER_SIZE); + socketOut = saslStreams.out; super.initialize(new DataInputStream(input)); @@ -263,19 +246,6 @@ public void run() { } } } - - /** - * Returns InetAddress from peer - * The getRemoteAddressString is the form /ip-address:port - * The ip-address is extracted from peer and InetAddress is formed - * @param peer - * @return - * @throws UnknownHostException - */ - private static InetAddress getClientAddress(Peer peer) { - return InetAddresses.forString( - peer.getRemoteAddressString().split(":")[0].substring(1)); - } @Override public void requestShortCircuitFds(final ExtendedBlock blk, @@ -656,17 +626,12 @@ public void writeBlock(final ExtendedBlock block, OutputStream unbufMirrorOut = NetUtils.getOutputStream(mirrorSock, writeTimeout); InputStream unbufMirrorIn = NetUtils.getInputStream(mirrorSock); - if (dnConf.encryptDataTransfer && - !dnConf.trustedChannelResolver.isTrusted(mirrorSock.getInetAddress())) { - IOStreamPair encryptedStreams = - DataTransferEncryptor.getEncryptedStreams( - unbufMirrorOut, unbufMirrorIn, - datanode.blockPoolTokenSecretManager - .generateDataEncryptionKey(block.getBlockPoolId())); - - unbufMirrorOut = encryptedStreams.out; - unbufMirrorIn = encryptedStreams.in; - } + DataEncryptionKeyFactory keyFactory = + datanode.getDataEncryptionKeyFactoryForBlock(block); + IOStreamPair saslStreams = datanode.saslClient.socketSend(mirrorSock, + unbufMirrorOut, unbufMirrorIn, keyFactory, blockToken, targets[0]); + unbufMirrorOut = saslStreams.out; + unbufMirrorIn = saslStreams.in; mirrorOut = new DataOutputStream(new BufferedOutputStream(unbufMirrorOut, HdfsConstants.SMALL_BUFFER_SIZE)); mirrorIn = new DataInputStream(unbufMirrorIn); @@ -1026,17 +991,12 @@ public void replaceBlock(final ExtendedBlock block, OutputStream unbufProxyOut = NetUtils.getOutputStream(proxySock, dnConf.socketWriteTimeout); InputStream unbufProxyIn = NetUtils.getInputStream(proxySock); - if (dnConf.encryptDataTransfer && - !dnConf.trustedChannelResolver.isTrusted( - proxySock.getInetAddress())) { - IOStreamPair encryptedStreams = - DataTransferEncryptor.getEncryptedStreams( - unbufProxyOut, unbufProxyIn, - datanode.blockPoolTokenSecretManager - .generateDataEncryptionKey(block.getBlockPoolId())); - unbufProxyOut = encryptedStreams.out; - unbufProxyIn = encryptedStreams.in; - } + DataEncryptionKeyFactory keyFactory = + datanode.getDataEncryptionKeyFactoryForBlock(block); + IOStreamPair saslStreams = datanode.saslClient.socketSend(proxySock, + unbufProxyOut, unbufProxyIn, keyFactory, blockToken, proxySource); + unbufProxyOut = saslStreams.out; + unbufProxyIn = saslStreams.in; proxyOut = new DataOutputStream(new BufferedOutputStream(unbufProxyOut, HdfsConstants.SMALL_BUFFER_SIZE)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java index c9e548b4d06..b6fe4c63ec5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; import org.apache.hadoop.hdfs.server.common.JspHelper; @@ -510,7 +511,7 @@ static void generateFileChunks(JspWriter out, HttpServletRequest req, JspHelper.streamBlockInAscii(new InetSocketAddress(req.getServerName(), datanodePort), bpid, blockId, blockToken, genStamp, blockSize, startOffset, chunkSizeToView, out, conf, dfs.getConf(), - dfs.getDataEncryptionKey()); + dfs, getSaslDataTransferClient(req)); } catch (Exception e) { out.print(e); } @@ -669,7 +670,7 @@ static void generateFileChunksForTail(JspWriter out, HttpServletRequest req, out.print(""); dfs.close(); } @@ -702,4 +703,16 @@ public static String getVersionTable(ServletContext context) { return sb.toString(); } + /** + * Gets the {@link SaslDataTransferClient} from the {@link DataNode} attached + * to the servlet context. + * + * @return SaslDataTransferClient from DataNode + */ + private static SaslDataTransferClient getSaslDataTransferClient( + HttpServletRequest req) { + DataNode dataNode = (DataNode)req.getSession().getServletContext() + .getAttribute("datanode"); + return dataNode.saslClient; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java index 9a66505436b..542e60e4016 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hdfs.server.namenode; +import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT; +import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY; + import java.io.FileNotFoundException; import java.io.IOException; import java.io.OutputStream; @@ -47,6 +50,7 @@ import org.apache.hadoop.hdfs.RemotePeerFactory; import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.net.TcpPeerServer; +import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -55,6 +59,12 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; +import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus; import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas; @@ -65,6 +75,7 @@ import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Time; import com.google.common.annotations.VisibleForTesting; @@ -92,7 +103,7 @@ * factors of each file. */ @InterfaceAudience.Private -public class NamenodeFsck { +public class NamenodeFsck implements DataEncryptionKeyFactory { public static final Log LOG = LogFactory.getLog(NameNode.class.getName()); // return string marking fsck status @@ -149,6 +160,7 @@ public class NamenodeFsck { private List snapshottableDirs = null; private final BlockPlacementPolicy bpPolicy; + private final SaslDataTransferClient saslClient; /** * Filesystem checker. @@ -175,6 +187,12 @@ public class NamenodeFsck { networktopology, namenode.getNamesystem().getBlockManager().getDatanodeManager() .getHost2DatanodeMap()); + this.saslClient = new SaslDataTransferClient( + DataTransferSaslUtil.getSaslPropertiesResolver(conf), + TrustedChannelResolver.getInstance(conf), + conf.getBoolean( + IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, + IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT)); for (Iterator it = pmap.keySet().iterator(); it.hasNext();) { String key = it.next(); @@ -616,15 +634,16 @@ private void copyBlock(DFSClient dfs, LocatedBlock lblock, setConfiguration(namenode.conf). setRemotePeerFactory(new RemotePeerFactory() { @Override - public Peer newConnectedPeer(InetSocketAddress addr) + public Peer newConnectedPeer(InetSocketAddress addr, + Token blockToken, DatanodeID datanodeId) throws IOException { Peer peer = null; Socket s = NetUtils.getDefaultSocketFactory(conf).createSocket(); try { s.connect(addr, HdfsServerConstants.READ_TIMEOUT); s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT); - peer = TcpPeerServer.peerFromSocketAndKey(s, namenode.getRpcServer(). - getDataEncryptionKey()); + peer = TcpPeerServer.peerFromSocketAndKey(saslClient, s, + NamenodeFsck.this, blockToken, datanodeId); } finally { if (peer == null) { IOUtils.closeQuietly(s); @@ -663,7 +682,12 @@ public Peer newConnectedPeer(InetSocketAddress addr) throw new Exception("Could not copy block data for " + lblock.getBlock()); } } - + + @Override + public DataEncryptionKey newDataEncryptionKey() throws IOException { + return namenode.getRpcServer().getDataEncryptionKey(); + } + /* * XXX (ab) See comment above for copyBlock(). * diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 1a03da11785..6cca87aca71 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -1451,6 +1451,37 @@ + + dfs.data.transfer.protection + + + A comma-separated list of SASL protection values used for secured + connections to the DataNode when reading or writing block data. Possible + values are authentication, integrity and privacy. authentication means + authentication only and no integrity or privacy; integrity implies + authentication and integrity are enabled; and privacy implies all of + authentication, integrity and privacy are enabled. If + dfs.encrypt.data.transfer is set to true, then it supersedes the setting for + dfs.data.transfer.protection and enforces that all connections must use a + specialized encrypted SASL handshake. This property is ignored for + connections to a DataNode listening on a privileged port. In this case, it + is assumed that the use of a privileged port establishes sufficient trust. + + + + + dfs.data.transfer.saslproperties.resolver.class + + + SaslPropertiesResolver used to resolve the QOP used for a connection to the + DataNode when reading or writing block data. If not specified, the full set + of values specified in dfs.data.transfer.protection is used while + determining the QOP used for the connection. If a class is specified, then + the QOP values returned by the class will be used while determining the QOP + used for the connection. + + + dfs.datanode.hdfs-blocks-metadata.enabled false diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java index 0de3ab2e915..88b7f37dcce 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java @@ -33,9 +33,11 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.net.TcpPeerServer; +import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; @@ -48,6 +50,7 @@ import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.token.Token; import org.apache.log4j.Level; import org.apache.log4j.LogManager; @@ -192,7 +195,8 @@ public static BlockReader getBlockReader(MiniDFSCluster cluster, setAllowShortCircuitLocalReads(true). setRemotePeerFactory(new RemotePeerFactory() { @Override - public Peer newConnectedPeer(InetSocketAddress addr) + public Peer newConnectedPeer(InetSocketAddress addr, + Token blockToken, DatanodeID datanodeId) throws IOException { Peer peer = null; Socket sock = NetUtils. @@ -251,4 +255,4 @@ public static void enableShortCircuitShmTracing() { LogManager.getLogger(DataNode.class.getName()).setLevel( Level.TRACE); } -} \ No newline at end of file +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index 98eb5f7e4b2..5b1c6e35aa9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -19,12 +19,15 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY; +import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_DEFAULT; +import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY; @@ -1310,15 +1313,42 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes, } SecureResources secureResources = null; - if (UserGroupInformation.isSecurityEnabled()) { + if (UserGroupInformation.isSecurityEnabled() && + conf.get(DFS_DATA_TRANSFER_PROTECTION_KEY) == null) { try { secureResources = SecureDataNodeStarter.getSecureResources(dnConf); } catch (Exception ex) { ex.printStackTrace(); } } - DataNode dn = DataNode.instantiateDataNode(dnArgs, dnConf, - secureResources); + final int maxRetriesOnSasl = conf.getInt( + IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY, + IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_DEFAULT); + int numRetries = 0; + DataNode dn = null; + while (true) { + try { + dn = DataNode.instantiateDataNode(dnArgs, dnConf, + secureResources); + break; + } catch (IOException e) { + // Work around issue testing security where rapidly starting multiple + // DataNodes using the same principal gets rejected by the KDC as a + // replay attack. + if (UserGroupInformation.isSecurityEnabled() && + numRetries < maxRetriesOnSasl) { + try { + Thread.sleep(1000); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + break; + } + ++numRetries; + continue; + } + throw e; + } + } if(dn == null) throw new IOException("Cannot start DataNode in " + dnConf.get(DFS_DATANODE_DATA_DIR_KEY)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferTestCase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferTestCase.java new file mode 100644 index 00000000000..6f2b3aa88f5 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferTestCase.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol.datatransfer.sasl; + +import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY; +import static org.junit.Assert.*; + +import java.io.File; +import java.util.Properties; + +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.http.HttpConfig; +import org.apache.hadoop.minikdc.MiniKdc; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; +import org.apache.hadoop.security.ssl.KeyStoreTestUtil; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +public abstract class SaslDataTransferTestCase { + + private static File baseDir; + private static String hdfsPrincipal; + private static MiniKdc kdc; + private static String keytab; + private static String spnegoPrincipal; + + @BeforeClass + public static void initKdc() throws Exception { + baseDir = new File(System.getProperty("test.build.dir", "target/test-dir"), + SaslDataTransferTestCase.class.getSimpleName()); + FileUtil.fullyDelete(baseDir); + assertTrue(baseDir.mkdirs()); + + Properties kdcConf = MiniKdc.createConf(); + kdc = new MiniKdc(kdcConf, baseDir); + kdc.start(); + + String userName = UserGroupInformation.getLoginUser().getShortUserName(); + File keytabFile = new File(baseDir, userName + ".keytab"); + keytab = keytabFile.getAbsolutePath(); + kdc.createPrincipal(keytabFile, userName + "/localhost", "HTTP/localhost"); + hdfsPrincipal = userName + "/localhost@" + kdc.getRealm(); + spnegoPrincipal = "HTTP/localhost@" + kdc.getRealm(); + } + + @AfterClass + public static void shutdownKdc() { + if (kdc != null) { + kdc.stop(); + } + FileUtil.fullyDelete(baseDir); + } + + /** + * Creates configuration for starting a secure cluster. + * + * @param dataTransferProtection supported QOPs + * @return configuration for starting a secure cluster + * @throws Exception if there is any failure + */ + protected HdfsConfiguration createSecureConfig( + String dataTransferProtection) throws Exception { + HdfsConfiguration conf = new HdfsConfiguration(); + SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS, conf); + conf.set(DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, hdfsPrincipal); + conf.set(DFS_NAMENODE_KEYTAB_FILE_KEY, keytab); + conf.set(DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, hdfsPrincipal); + conf.set(DFS_DATANODE_KEYTAB_FILE_KEY, keytab); + conf.set(DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, spnegoPrincipal); + conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true); + conf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, dataTransferProtection); + conf.set(DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name()); + conf.set(DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0"); + conf.set(DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0"); + conf.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY, 10); + + String keystoresDir = baseDir.getAbsolutePath(); + String sslConfDir = KeyStoreTestUtil.getClasspathDir(this.getClass()); + KeyStoreTestUtil.setupSSLConfig(keystoresDir, sslConfDir, conf, false); + return conf; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/TestSaslDataTransfer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/TestSaslDataTransfer.java new file mode 100644 index 00000000000..7602f44b0b0 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/TestSaslDataTransfer.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol.datatransfer.sasl; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY; +import static org.junit.Assert.*; + +import java.io.IOException; + +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileSystemTestHelper; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.io.IOUtils; +import org.junit.After; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +public class TestSaslDataTransfer extends SaslDataTransferTestCase { + + private static final int BLOCK_SIZE = 4096; + private static final int BUFFER_SIZE= 1024; + private static final int NUM_BLOCKS = 3; + private static final Path PATH = new Path("/file1"); + private static final short REPLICATION = 3; + + private MiniDFSCluster cluster; + private FileSystem fs; + + @Rule + public ExpectedException exception = ExpectedException.none(); + + @After + public void shutdown() { + IOUtils.cleanup(null, fs); + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testAuthentication() throws Exception { + HdfsConfiguration clusterConf = createSecureConfig( + "authentication,integrity,privacy"); + startCluster(clusterConf); + HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf); + clientConf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "authentication"); + doTest(clientConf); + } + + @Test + public void testIntegrity() throws Exception { + HdfsConfiguration clusterConf = createSecureConfig( + "authentication,integrity,privacy"); + startCluster(clusterConf); + HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf); + clientConf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "integrity"); + doTest(clientConf); + } + + @Test + public void testPrivacy() throws Exception { + HdfsConfiguration clusterConf = createSecureConfig( + "authentication,integrity,privacy"); + startCluster(clusterConf); + HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf); + clientConf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "privacy"); + doTest(clientConf); + } + + @Test + public void testClientAndServerDoNotHaveCommonQop() throws Exception { + HdfsConfiguration clusterConf = createSecureConfig("privacy"); + startCluster(clusterConf); + HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf); + clientConf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "authentication"); + exception.expect(IOException.class); + exception.expectMessage("could only be replicated to 0 nodes"); + doTest(clientConf); + } + + @Test + public void testClientSaslNoServerSasl() throws Exception { + HdfsConfiguration clusterConf = createSecureConfig(""); + startCluster(clusterConf); + HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf); + clientConf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "authentication"); + exception.expect(IOException.class); + exception.expectMessage("could only be replicated to 0 nodes"); + doTest(clientConf); + } + + @Test + public void testServerSaslNoClientSasl() throws Exception { + HdfsConfiguration clusterConf = createSecureConfig( + "authentication,integrity,privacy"); + startCluster(clusterConf); + HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf); + clientConf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, ""); + exception.expect(IOException.class); + exception.expectMessage("could only be replicated to 0 nodes"); + doTest(clientConf); + } + + /** + * Tests DataTransferProtocol with the given client configuration. + * + * @param conf client configuration + * @throws IOException if there is an I/O error + */ + private void doTest(HdfsConfiguration conf) throws IOException { + fs = FileSystem.get(cluster.getURI(), conf); + FileSystemTestHelper.createFile(fs, PATH, NUM_BLOCKS, BLOCK_SIZE); + assertArrayEquals(FileSystemTestHelper.getFileData(NUM_BLOCKS, BLOCK_SIZE), + DFSTestUtil.readFile(fs, PATH).getBytes("UTF-8")); + BlockLocation[] blockLocations = fs.getFileBlockLocations(PATH, 0, + Long.MAX_VALUE); + assertNotNull(blockLocations); + assertEquals(NUM_BLOCKS, blockLocations.length); + for (BlockLocation blockLocation: blockLocations) { + assertNotNull(blockLocation.getHosts()); + assertEquals(3, blockLocation.getHosts().length); + } + } + + /** + * Starts a cluster with the given configuration. + * + * @param conf cluster configuration + * @throws IOException if there is an I/O error + */ + private void startCluster(HdfsConfiguration conf) throws IOException { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); + cluster.waitActive(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithSaslDataTransfer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithSaslDataTransfer.java new file mode 100644 index 00000000000..b579c8937bd --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithSaslDataTransfer.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.balancer; + +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferTestCase; +import org.junit.Test; + +public class TestBalancerWithSaslDataTransfer extends SaslDataTransferTestCase { + + private static final TestBalancer TEST_BALANCER = new TestBalancer(); + + @Test + public void testBalancer0Authentication() throws Exception { + TEST_BALANCER.testBalancer0Internal(createSecureConfig("authentication")); + } + + @Test + public void testBalancer0Integrity() throws Exception { + TEST_BALANCER.testBalancer0Internal(createSecureConfig("integrity")); + } + + @Test + public void testBalancer0Privacy() throws Exception { + TEST_BALANCER.testBalancer0Internal(createSecureConfig("privacy")); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java index 0c3c5806e8c..b15cb380976 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hdfs.RemotePeerFactory; import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.net.TcpPeerServer; +import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; @@ -160,7 +161,8 @@ private static void tryRead(final Configuration conf, LocatedBlock lblock, setConfiguration(conf). setRemotePeerFactory(new RemotePeerFactory() { @Override - public Peer newConnectedPeer(InetSocketAddress addr) + public Peer newConnectedPeer(InetSocketAddress addr, + Token blockToken, DatanodeID datanodeId) throws IOException { Peer peer = null; Socket sock = NetUtils.getDefaultSocketFactory(conf).createSocket(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java index 3644c33a76d..38403eb8258 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java @@ -46,9 +46,11 @@ import org.apache.hadoop.hdfs.net.TcpPeerServer; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; +import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; @@ -58,6 +60,7 @@ import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.token.Token; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -307,7 +310,8 @@ private void accessBlock(DatanodeInfo datanode, LocatedBlock lblock) setConfiguration(conf). setRemotePeerFactory(new RemotePeerFactory() { @Override - public Peer newConnectedPeer(InetSocketAddress addr) + public Peer newConnectedPeer(InetSocketAddress addr, + Token blockToken, DatanodeID datanodeId) throws IOException { Peer peer = null; Socket sock = NetUtils.getDefaultSocketFactory(conf).createSocket();