diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java index 2fa4bd0c038..7b1bfb1ecda 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java @@ -620,7 +620,8 @@ public abstract class FileSystem extends Configured implements Closeable { conf.getInt("io.bytes.per.checksum", 512), 64 * 1024, getDefaultReplication(), - conf.getInt("io.file.buffer.size", 4096)); + conf.getInt("io.file.buffer.size", 4096), + false); } /** diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FsServerDefaults.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FsServerDefaults.java index 6d5880ee900..f019593a107 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FsServerDefaults.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FsServerDefaults.java @@ -48,17 +48,20 @@ public class FsServerDefaults implements Writable { private int writePacketSize; private short replication; private int fileBufferSize; + private boolean encryptDataTransfer; public FsServerDefaults() { } public FsServerDefaults(long blockSize, int bytesPerChecksum, - int writePacketSize, short replication, int fileBufferSize) { + int writePacketSize, short replication, int fileBufferSize, + boolean encryptDataTransfer) { this.blockSize = blockSize; this.bytesPerChecksum = bytesPerChecksum; this.writePacketSize = writePacketSize; this.replication = replication; this.fileBufferSize = fileBufferSize; + this.encryptDataTransfer = encryptDataTransfer; } public long getBlockSize() { @@ -80,6 +83,10 @@ public class FsServerDefaults implements Writable { public int getFileBufferSize() { return fileBufferSize; } + + public boolean getEncryptDataTransfer() { + return encryptDataTransfer; + } // ///////////////////////////////////////// // Writable diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FtpConfigKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FtpConfigKeys.java index 23c5c6392ed..b646dcaf2c1 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FtpConfigKeys.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FtpConfigKeys.java @@ -44,6 +44,7 @@ public class FtpConfigKeys extends CommonConfigurationKeys { public static final String CLIENT_WRITE_PACKET_SIZE_KEY = "ftp.client-write-packet-size"; public static final int CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024; + public static final boolean ENCRYPT_DATA_TRANSFER_DEFAULT = false; protected static FsServerDefaults getServerDefaults() throws IOException { return new FsServerDefaults( @@ -51,7 +52,8 @@ public class FtpConfigKeys extends CommonConfigurationKeys { BYTES_PER_CHECKSUM_DEFAULT, CLIENT_WRITE_PACKET_SIZE_DEFAULT, REPLICATION_DEFAULT, - STREAM_BUFFER_SIZE_DEFAULT); + STREAM_BUFFER_SIZE_DEFAULT, + ENCRYPT_DATA_TRANSFER_DEFAULT); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/local/LocalConfigKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/local/LocalConfigKeys.java index 0561151b0dc..da767d29dc3 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/local/LocalConfigKeys.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/local/LocalConfigKeys.java @@ -43,6 +43,7 @@ public class LocalConfigKeys extends CommonConfigurationKeys { public static final String CLIENT_WRITE_PACKET_SIZE_KEY = "file.client-write-packet-size"; public static final int CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024; + public static final boolean ENCRYPT_DATA_TRANSFER_DEFAULT = false; public static FsServerDefaults getServerDefaults() throws IOException { return new FsServerDefaults( @@ -50,7 +51,8 @@ public class LocalConfigKeys extends CommonConfigurationKeys { BYTES_PER_CHECKSUM_DEFAULT, CLIENT_WRITE_PACKET_SIZE_DEFAULT, REPLICATION_DEFAULT, - STREAM_BUFFER_SIZE_DEFAULT); + STREAM_BUFFER_SIZE_DEFAULT, + ENCRYPT_DATA_TRANSFER_DEFAULT); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslInputStream.java index f89c0ace7ef..fa82664bdd2 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslInputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslInputStream.java @@ -22,6 +22,8 @@ import java.io.DataInputStream; import java.io.EOFException; import java.io.InputStream; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; import javax.security.sasl.Sasl; import javax.security.sasl.SaslClient; @@ -42,7 +44,7 @@ import org.apache.hadoop.classification.InterfaceStability; */ @InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"}) @InterfaceStability.Evolving -public class SaslInputStream extends InputStream { +public class SaslInputStream extends InputStream implements ReadableByteChannel { public static final Log LOG = LogFactory.getLog(SaslInputStream.class); private final DataInputStream inStream; @@ -65,6 +67,8 @@ public class SaslInputStream extends InputStream { private int ostart = 0; // position of the last "new" byte private int ofinish = 0; + // whether or not this stream is open + private boolean isOpen = true; private static int unsignedBytesToInt(byte[] buf) { if (buf.length != 4) { @@ -330,6 +334,7 @@ public class SaslInputStream extends InputStream { ostart = 0; ofinish = 0; inStream.close(); + isOpen = false; } /** @@ -342,4 +347,28 @@ public class SaslInputStream extends InputStream { public boolean markSupported() { return false; } + + @Override + public boolean isOpen() { + return isOpen; + } + + @Override + public int read(ByteBuffer dst) throws IOException { + int bytesRead = 0; + if (dst.hasArray()) { + bytesRead = read(dst.array(), dst.arrayOffset() + dst.position(), + dst.remaining()); + if (bytesRead > -1) { + dst.position(dst.position() + bytesRead); + } + } else { + byte[] buf = new byte[dst.remaining()]; + bytesRead = read(buf); + if (bytesRead > -1) { + dst.put(buf, 0, bytesRead); + } + } + return bytesRead; + } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 507313dd7c3..61769198fdd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -21,6 +21,8 @@ Release 2.0.1-alpha - UNRELEASED HDFS-3513. HttpFS should cache filesystems. (tucu) + HDFS-3637. Add support for encrypting the DataTransferProtocol. (atm) + IMPROVEMENTS HDFS-3390. DFSAdmin should print full stack traces of errors when DEBUG diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java index 6b5b92839b7..cd6dc2d25ed 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.net.Socket; import org.apache.hadoop.fs.ByteBufferReadable; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; /** * A BlockReader is responsible for reading a single block @@ -71,4 +72,8 @@ public interface BlockReader extends ByteBufferReadable { */ boolean hasSentStatusCode(); + /** + * @return a reference to the streams this block reader is using. + */ + IOStreamPair getStreams(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java index 855d7ece795..c71f1ced6a6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java @@ -25,7 +25,12 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSClient.Conf; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; +import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; +import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.token.Token; @@ -41,12 +46,13 @@ public class BlockReaderFactory { Configuration conf, Socket sock, String file, ExtendedBlock block, Token blockToken, - long startOffset, long len) throws IOException { + long startOffset, long len, DataEncryptionKey encryptionKey) + throws IOException { int bufferSize = conf.getInt(DFSConfigKeys.IO_FILE_BUFFER_SIZE_KEY, DFSConfigKeys.IO_FILE_BUFFER_SIZE_DEFAULT); return newBlockReader(new Conf(conf), sock, file, block, blockToken, startOffset, - len, bufferSize, true, ""); + len, bufferSize, true, "", encryptionKey, null); } /** @@ -73,14 +79,32 @@ public class BlockReaderFactory { Token blockToken, long startOffset, long len, int bufferSize, boolean verifyChecksum, - String clientName) + String clientName, + DataEncryptionKey encryptionKey, + IOStreamPair ioStreams) throws IOException { + if (conf.useLegacyBlockReader) { + if (encryptionKey != null) { + throw new RuntimeException("Encryption is not supported with the legacy block reader."); + } return RemoteBlockReader.newBlockReader( sock, file, block, blockToken, startOffset, len, bufferSize, verifyChecksum, clientName); } else { + if (ioStreams == null) { + ioStreams = new IOStreamPair(NetUtils.getInputStream(sock), + NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT)); + if (encryptionKey != null) { + IOStreamPair encryptedStreams = + DataTransferEncryptor.getEncryptedStreams( + ioStreams.out, ioStreams.in, encryptionKey); + ioStreams = encryptedStreams; + } + } + return RemoteBlockReader2.newBlockReader( - sock, file, block, blockToken, startOffset, len, bufferSize, verifyChecksum, clientName); + sock, file, block, blockToken, startOffset, len, bufferSize, + verifyChecksum, clientName, encryptionKey, ioStreams); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java index 40b40c0841b..6db6e75198b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.util.DirectBufferPool; @@ -681,4 +682,9 @@ class BlockReaderLocal implements BlockReader { public boolean hasSentStatusCode() { return false; } + + @Override + public IOStreamPair getStreams() { + return null; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 833bbb6073c..307237c523e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -45,6 +45,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKRE import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY; @@ -53,6 +55,7 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -109,12 +112,15 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; +import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.hdfs.protocol.datatransfer.Op; import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; +import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; @@ -182,6 +188,7 @@ public class DFSClient implements java.io.Closeable { final Conf dfsClientConf; private Random r = new Random(); private SocketAddress[] localInterfaceAddrs; + private DataEncryptionKey encryptionKey; /** * DFSClient configuration @@ -351,9 +358,6 @@ public class DFSClient implements java.io.Closeable { this.clientName = "DFSClient_" + dfsClientConf.taskId + "_" + DFSUtil.getRandom().nextInt() + "_" + Thread.currentThread().getId(); - this.socketCache = new SocketCache(dfsClientConf.socketCacheCapacity); - - if (rpcNamenode != null) { // This case is used for testing. Preconditions.checkArgument(nameNodeUri == null); @@ -383,6 +387,8 @@ public class DFSClient implements java.io.Closeable { Joiner.on(',').join(localInterfaces)+ "] with addresses [" + Joiner.on(',').join(localInterfaceAddrs) + "]"); } + + this.socketCache = new SocketCache(dfsClientConf.socketCacheCapacity); } /** @@ -1457,7 +1463,44 @@ public class DFSClient implements java.io.Closeable { */ public MD5MD5CRC32FileChecksum getFileChecksum(String src) throws IOException { checkOpen(); - return getFileChecksum(src, namenode, socketFactory, dfsClientConf.socketTimeout); + return getFileChecksum(src, namenode, socketFactory, + dfsClientConf.socketTimeout, getDataEncryptionKey()); + } + + @InterfaceAudience.Private + public void clearDataEncryptionKey() { + LOG.debug("Clearing encryption key"); + synchronized (this) { + encryptionKey = null; + } + } + + /** + * @return true if data sent between this client and DNs should be encrypted, + * false otherwise. + * @throws IOException in the event of error communicating with the NN + */ + boolean shouldEncryptData() throws IOException { + FsServerDefaults d = getServerDefaults(); + return d == null ? false : d.getEncryptDataTransfer(); + } + + @InterfaceAudience.Private + public DataEncryptionKey getDataEncryptionKey() + throws IOException { + if (shouldEncryptData()) { + synchronized (this) { + if (encryptionKey == null || + (encryptionKey != null && + encryptionKey.expiryDate < Time.now())) { + LOG.debug("Getting new encryption token from NN"); + encryptionKey = namenode.getDataEncryptionKey(); + } + return encryptionKey; + } + } else { + return null; + } } /** @@ -1466,8 +1509,8 @@ public class DFSClient implements java.io.Closeable { * @return The checksum */ public static MD5MD5CRC32FileChecksum getFileChecksum(String src, - ClientProtocol namenode, SocketFactory socketFactory, int socketTimeout - ) throws IOException { + ClientProtocol namenode, SocketFactory socketFactory, int socketTimeout, + DataEncryptionKey encryptionKey) throws IOException { //get all block locations LocatedBlocks blockLocations = callGetBlockLocations(namenode, src, 0, Long.MAX_VALUE); if (null == blockLocations) { @@ -1510,10 +1553,18 @@ public class DFSClient implements java.io.Closeable { timeout); sock.setSoTimeout(timeout); - out = new DataOutputStream( - new BufferedOutputStream(NetUtils.getOutputStream(sock), - HdfsConstants.SMALL_BUFFER_SIZE)); - in = new DataInputStream(NetUtils.getInputStream(sock)); + OutputStream unbufOut = NetUtils.getOutputStream(sock); + InputStream unbufIn = NetUtils.getInputStream(sock); + if (encryptionKey != null) { + IOStreamPair encryptedStreams = + DataTransferEncryptor.getEncryptedStreams( + unbufOut, unbufIn, encryptionKey); + unbufOut = encryptedStreams.out; + unbufIn = encryptedStreams.in; + } + out = new DataOutputStream(new BufferedOutputStream(unbufOut, + HdfsConstants.SMALL_BUFFER_SIZE)); + in = new DataInputStream(unbufIn); if (LOG.isDebugEnabled()) { LOG.debug("write to " + datanodes[j] + ": " diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 2c3a422cc8b..c23005830a3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -367,4 +367,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final boolean DFS_HA_AUTO_FAILOVER_ENABLED_DEFAULT = false; public static final String DFS_HA_ZKFC_PORT_KEY = "dfs.ha.zkfc.port"; public static final int DFS_HA_ZKFC_PORT_DEFAULT = 8019; + + // Security-related configs + public static final String DFS_ENCRYPT_DATA_TRANSFER_KEY = "dfs.encrypt.data.transfer"; + public static final boolean DFS_ENCRYPT_DATA_TRANSFER_DEFAULT = false; + public static final String DFS_DATA_ENCRYPTION_ALGORITHM_KEY = "dfs.encrypt.data.transfer.algorithm"; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index f83a858533d..91026bea9a5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -37,11 +37,14 @@ import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.ByteBufferReadable; import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.UnresolvedLinkException; +import org.apache.hadoop.hdfs.SocketCache.SocketAndStreams; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; +import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException; @@ -425,6 +428,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable // DatanodeInfo chosenNode = null; int refetchToken = 1; // only need to get a new access token once + int refetchEncryptionKey = 1; // only need to get a new encryption key once boolean connectFailedOnce = false; @@ -452,7 +456,14 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable } return chosenNode; } catch (IOException ex) { - if (ex instanceof InvalidBlockTokenException && refetchToken > 0) { + if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) { + DFSClient.LOG.info("Will fetch a new encryption key and retry, " + + "encryption key was invalid when connecting to " + targetAddr + + " : " + ex); + // The encryption key used is invalid. + refetchEncryptionKey--; + dfsClient.clearDataEncryptionKey(); + } else if (ex instanceof InvalidBlockTokenException && refetchToken > 0) { DFSClient.LOG.info("Will fetch a new access token and retry, " + "access token was invalid when connecting to " + targetAddr + " : " + ex); @@ -754,6 +765,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable // Connect to best DataNode for desired Block, with potential offset // int refetchToken = 1; // only need to get a new access token once + int refetchEncryptionKey = 1; // only need to get a new encryption key once while (true) { // cached block locations may have been updated by chooseDataNode() @@ -789,7 +801,14 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable dfsClient.disableShortCircuit(); continue; } catch (IOException e) { - if (e instanceof InvalidBlockTokenException && refetchToken > 0) { + if (e instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) { + DFSClient.LOG.info("Will fetch a new encryption key and retry, " + + "encryption key was invalid when connecting to " + targetAddr + + " : " + e); + // The encryption key used is invalid. + refetchEncryptionKey--; + dfsClient.clearDataEncryptionKey(); + } else if (e instanceof InvalidBlockTokenException && refetchToken > 0) { DFSClient.LOG.info("Will get a new access token and retry, " + "access token was invalid when connecting to " + targetAddr + " : " + e); @@ -818,8 +837,9 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable */ private void closeBlockReader(BlockReader reader) throws IOException { if (reader.hasSentStatusCode()) { + IOStreamPair ioStreams = reader.getStreams(); Socket oldSock = reader.takeSocket(); - socketCache.put(oldSock); + socketCache.put(oldSock, ioStreams); } reader.close(); } @@ -864,14 +884,15 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable // Allow retry since there is no way of knowing whether the cached socket // is good until we actually use it. for (int retries = 0; retries <= nCachedConnRetry && fromCache; ++retries) { - Socket sock = null; + SocketAndStreams sockAndStreams = null; // Don't use the cache on the last attempt - it's possible that there // are arbitrarily many unusable sockets in the cache, but we don't // want to fail the read. if (retries < nCachedConnRetry) { - sock = socketCache.get(dnAddr); + sockAndStreams = socketCache.get(dnAddr); } - if (sock == null) { + Socket sock; + if (sockAndStreams == null) { fromCache = false; sock = dfsClient.socketFactory.createSocket(); @@ -895,6 +916,8 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable dfsClient.getRandomLocalInterfaceAddr(), dfsClient.getConf().socketTimeout); sock.setSoTimeout(dfsClient.getConf().socketTimeout); + } else { + sock = sockAndStreams.sock; } try { @@ -905,12 +928,18 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable blockToken, startOffset, len, bufferSize, verifyChecksum, - clientName); + clientName, + dfsClient.getDataEncryptionKey(), + sockAndStreams == null ? null : sockAndStreams.ioStreams); return reader; } catch (IOException ex) { // Our socket is no good. DFSClient.LOG.debug("Error making BlockReader. Closing stale " + sock, ex); - sock.close(); + if (sockAndStreams != null) { + sockAndStreams.close(); + } else { + sock.close(); + } err = ex; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 42e7bbee647..b7db3147b50 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -24,7 +24,9 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStream; import java.io.InterruptedIOException; +import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.Socket; import java.nio.BufferOverflowException; @@ -56,6 +58,9 @@ import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; +import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; +import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; @@ -867,16 +872,26 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable { try { sock = createSocketForPipeline(src, 2, dfsClient); final long writeTimeout = dfsClient.getDatanodeWriteTimeout(2); - out = new DataOutputStream(new BufferedOutputStream( - NetUtils.getOutputStream(sock, writeTimeout), + + OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout); + InputStream unbufIn = NetUtils.getInputStream(sock); + if (dfsClient.shouldEncryptData()) { + IOStreamPair encryptedStreams = + DataTransferEncryptor.getEncryptedStreams( + unbufOut, unbufIn, dfsClient.getDataEncryptionKey()); + unbufOut = encryptedStreams.out; + unbufIn = encryptedStreams.in; + } + out = new DataOutputStream(new BufferedOutputStream(unbufOut, HdfsConstants.SMALL_BUFFER_SIZE)); + in = new DataInputStream(unbufIn); //send the TRANSFER_BLOCK request new Sender(out).transferBlock(block, blockToken, dfsClient.clientName, targets); + out.flush(); //ack - in = new DataInputStream(NetUtils.getInputStream(sock)); BlockOpResponseProto response = BlockOpResponseProto.parseFrom(HdfsProtoUtil.vintPrefixed(in)); if (SUCCESS != response.getStatus()) { @@ -1034,77 +1049,98 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable { // persist blocks on namenode on next flush persistBlocks.set(true); - boolean result = false; - DataOutputStream out = null; - try { - assert null == s : "Previous socket unclosed"; - s = createSocketForPipeline(nodes[0], nodes.length, dfsClient); - long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length); - - // - // Xmit header info to datanode - // - out = new DataOutputStream(new BufferedOutputStream( - NetUtils.getOutputStream(s, writeTimeout), - HdfsConstants.SMALL_BUFFER_SIZE)); - - assert null == blockReplyStream : "Previous blockReplyStream unclosed"; - blockReplyStream = new DataInputStream(NetUtils.getInputStream(s)); - - // send the request - new Sender(out).writeBlock(block, accessToken, dfsClient.clientName, - nodes, null, recoveryFlag? stage.getRecoveryStage() : stage, - nodes.length, block.getNumBytes(), bytesSent, newGS, checksum); - - // receive ack for connect - BlockOpResponseProto resp = BlockOpResponseProto.parseFrom( - HdfsProtoUtil.vintPrefixed(blockReplyStream)); - pipelineStatus = resp.getStatus(); - firstBadLink = resp.getFirstBadLink(); - - if (pipelineStatus != SUCCESS) { - if (pipelineStatus == Status.ERROR_ACCESS_TOKEN) { - throw new InvalidBlockTokenException( - "Got access token error for connect ack with firstBadLink as " - + firstBadLink); - } else { - throw new IOException("Bad connect ack with firstBadLink as " - + firstBadLink); + int refetchEncryptionKey = 1; + while (true) { + boolean result = false; + DataOutputStream out = null; + try { + assert null == s : "Previous socket unclosed"; + assert null == blockReplyStream : "Previous blockReplyStream unclosed"; + s = createSocketForPipeline(nodes[0], nodes.length, dfsClient); + long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length); + + OutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout); + InputStream unbufIn = NetUtils.getInputStream(s); + if (dfsClient.shouldEncryptData()) { + IOStreamPair encryptedStreams = + DataTransferEncryptor.getEncryptedStreams(unbufOut, + unbufIn, dfsClient.getDataEncryptionKey()); + unbufOut = encryptedStreams.out; + unbufIn = encryptedStreams.in; } - } - assert null == blockStream : "Previous blockStream unclosed"; - blockStream = out; - result = true; // success - - } catch (IOException ie) { - - DFSClient.LOG.info("Exception in createBlockOutputStream", ie); - - // find the datanode that matches - if (firstBadLink.length() != 0) { - for (int i = 0; i < nodes.length; i++) { - if (nodes[i].getXferAddr().equals(firstBadLink)) { - errorIndex = i; - break; + out = new DataOutputStream(new BufferedOutputStream(unbufOut, + HdfsConstants.SMALL_BUFFER_SIZE)); + blockReplyStream = new DataInputStream(unbufIn); + + // + // Xmit header info to datanode + // + + // send the request + new Sender(out).writeBlock(block, accessToken, dfsClient.clientName, + nodes, null, recoveryFlag? stage.getRecoveryStage() : stage, + nodes.length, block.getNumBytes(), bytesSent, newGS, checksum); + + // receive ack for connect + BlockOpResponseProto resp = BlockOpResponseProto.parseFrom( + HdfsProtoUtil.vintPrefixed(blockReplyStream)); + pipelineStatus = resp.getStatus(); + firstBadLink = resp.getFirstBadLink(); + + if (pipelineStatus != SUCCESS) { + if (pipelineStatus == Status.ERROR_ACCESS_TOKEN) { + throw new InvalidBlockTokenException( + "Got access token error for connect ack with firstBadLink as " + + firstBadLink); + } else { + throw new IOException("Bad connect ack with firstBadLink as " + + firstBadLink); } } - } else { - errorIndex = 0; - } - hasError = true; - setLastException(ie); - result = false; // error - } finally { - if (!result) { - IOUtils.closeSocket(s); - s = null; - IOUtils.closeStream(out); - out = null; - IOUtils.closeStream(blockReplyStream); - blockReplyStream = null; + assert null == blockStream : "Previous blockStream unclosed"; + blockStream = out; + result = true; // success + + } catch (IOException ie) { + DFSClient.LOG.info("Exception in createBlockOutputStream", ie); + if (ie instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) { + DFSClient.LOG.info("Will fetch a new encryption key and retry, " + + "encryption key was invalid when connecting to " + + nodes[0].getXferAddr() + " : " + ie); + // The encryption key used is invalid. + refetchEncryptionKey--; + dfsClient.clearDataEncryptionKey(); + // Don't close the socket/exclude this node just yet. Try again with + // a new encryption key. + continue; + } + + // find the datanode that matches + if (firstBadLink.length() != 0) { + for (int i = 0; i < nodes.length; i++) { + if (nodes[i].getXferAddr().equals(firstBadLink)) { + errorIndex = i; + break; + } + } + } else { + errorIndex = 0; + } + hasError = true; + setLastException(ie); + result = false; // error + } finally { + if (!result) { + IOUtils.closeSocket(s); + s = null; + IOUtils.closeStream(out); + out = null; + IOUtils.closeStream(blockReplyStream); + blockReplyStream = null; + } } + return result; } - return result; } private LocatedBlock locateFollowingBlock(long start, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java index c5c0e295b54..7a95626afd2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java @@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FSInputChecker; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; @@ -458,7 +459,9 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { void sendReadResult(Socket sock, Status statusCode) { assert !sentStatusCode : "already sent status code to " + sock; try { - RemoteBlockReader2.writeReadResult(sock, statusCode); + RemoteBlockReader2.writeReadResult( + NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT), + statusCode); sentStatusCode = true; } catch (IOException e) { // It's ok not to be able to send this. But something is probably wrong. @@ -484,4 +487,11 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { throw new UnsupportedOperationException("readDirect unsupported in RemoteBlockReader"); } + @Override + public IOStreamPair getStreams() { + // This class doesn't support encryption, which is the only thing this + // method is used for. See HDFS-3637. + return null; + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java index fe4dc55c8d3..e1e9ca5e81a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java @@ -23,6 +23,7 @@ import java.io.BufferedOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.Socket; @@ -35,12 +36,15 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; +import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; +import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; @@ -83,7 +87,9 @@ public class RemoteBlockReader2 implements BlockReader { static final Log LOG = LogFactory.getLog(RemoteBlockReader2.class); - Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read. + Socket dnSock; + // for now just sending the status code (e.g. checksumOk) after the read. + private IOStreamPair ioStreams; private final ReadableByteChannel in; private DataChecksum checksum; @@ -206,9 +212,9 @@ public class RemoteBlockReader2 implements BlockReader { if (bytesNeededToFinish <= 0) { readTrailingEmptyPacket(); if (verifyChecksum) { - sendReadResult(dnSock, Status.CHECKSUM_OK); + sendReadResult(Status.CHECKSUM_OK); } else { - sendReadResult(dnSock, Status.SUCCESS); + sendReadResult(Status.SUCCESS); } } } @@ -292,9 +298,11 @@ public class RemoteBlockReader2 implements BlockReader { protected RemoteBlockReader2(String file, String bpid, long blockId, ReadableByteChannel in, DataChecksum checksum, boolean verifyChecksum, - long startOffset, long firstChunkOffset, long bytesToRead, Socket dnSock) { + long startOffset, long firstChunkOffset, long bytesToRead, Socket dnSock, + IOStreamPair ioStreams) { // Path is used only for printing block and file information in debug this.dnSock = dnSock; + this.ioStreams = ioStreams; this.in = in; this.checksum = checksum; this.verifyChecksum = verifyChecksum; @@ -369,24 +377,23 @@ public class RemoteBlockReader2 implements BlockReader { * closing our connection (which we will re-open), but won't affect * data correctness. */ - void sendReadResult(Socket sock, Status statusCode) { - assert !sentStatusCode : "already sent status code to " + sock; + void sendReadResult(Status statusCode) { + assert !sentStatusCode : "already sent status code to " + dnSock; try { - writeReadResult(sock, statusCode); + writeReadResult(ioStreams.out, statusCode); sentStatusCode = true; } catch (IOException e) { // It's ok not to be able to send this. But something is probably wrong. LOG.info("Could not send read status (" + statusCode + ") to datanode " + - sock.getInetAddress() + ": " + e.getMessage()); + dnSock.getInetAddress() + ": " + e.getMessage()); } } /** * Serialize the actual read result on the wire. */ - static void writeReadResult(Socket sock, Status statusCode) + static void writeReadResult(OutputStream out, Status statusCode) throws IOException { - OutputStream out = NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT); ClientReadStatusProto.newBuilder() .setStatus(statusCode) @@ -434,25 +441,32 @@ public class RemoteBlockReader2 implements BlockReader { * @param clientName Client name * @return New BlockReader instance, or null on error. */ - public static BlockReader newBlockReader( Socket sock, String file, + public static BlockReader newBlockReader(Socket sock, String file, ExtendedBlock block, Token blockToken, long startOffset, long len, int bufferSize, boolean verifyChecksum, - String clientName) + String clientName, + DataEncryptionKey encryptionKey, + IOStreamPair ioStreams) throws IOException { + + ReadableByteChannel ch; + if (ioStreams.in instanceof SocketInputWrapper) { + ch = ((SocketInputWrapper)ioStreams.in).getReadableByteChannel(); + } else { + ch = (ReadableByteChannel) ioStreams.in; + } + // in and out will be closed when sock is closed (by the caller) final DataOutputStream out = new DataOutputStream(new BufferedOutputStream( - NetUtils.getOutputStream(sock, - HdfsServerConstants.WRITE_TIMEOUT))); + ioStreams.out)); new Sender(out).readBlock(block, blockToken, clientName, startOffset, len); // - // Get bytes in block, set streams + // Get bytes in block // - SocketInputWrapper sin = NetUtils.getInputStream(sock); - ReadableByteChannel ch = sin.getReadableByteChannel(); - DataInputStream in = new DataInputStream(sin); + DataInputStream in = new DataInputStream(ioStreams.in); BlockOpResponseProto status = BlockOpResponseProto.parseFrom( vintPrefixed(in)); @@ -474,7 +488,8 @@ public class RemoteBlockReader2 implements BlockReader { } return new RemoteBlockReader2(file, block.getBlockPoolId(), block.getBlockId(), - ch, checksum, verifyChecksum, startOffset, firstChunkOffset, len, sock); + ch, checksum, verifyChecksum, startOffset, firstChunkOffset, len, sock, + ioStreams); } static void checkSuccess( @@ -498,4 +513,9 @@ public class RemoteBlockReader2 implements BlockReader { } } } + + @Override + public IOStreamPair getStreams() { + return ioStreams; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java index 548d0d8f9b9..2fa7b55d440 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs; +import java.io.Closeable; import java.net.Socket; import java.net.SocketAddress; @@ -29,6 +30,8 @@ import com.google.common.base.Preconditions; import com.google.common.collect.LinkedListMultimap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.io.IOUtils; /** @@ -37,7 +40,7 @@ import org.apache.hadoop.io.IOUtils; class SocketCache { static final Log LOG = LogFactory.getLog(SocketCache.class); - private final LinkedListMultimap multimap; + private final LinkedListMultimap multimap; private final int capacity; /** @@ -57,21 +60,21 @@ class SocketCache { * @param remote Remote address the socket is connected to. * @return A socket with unknown state, possibly closed underneath. Or null. */ - public synchronized Socket get(SocketAddress remote) { + public synchronized SocketAndStreams get(SocketAddress remote) { if (capacity <= 0) { // disabled return null; } - List socklist = multimap.get(remote); + List socklist = multimap.get(remote); if (socklist == null) { return null; } - Iterator iter = socklist.iterator(); + Iterator iter = socklist.iterator(); while (iter.hasNext()) { - Socket candidate = iter.next(); + SocketAndStreams candidate = iter.next(); iter.remove(); - if (!candidate.isClosed()) { + if (!candidate.sock.isClosed()) { return candidate; } } @@ -82,10 +85,11 @@ class SocketCache { * Give an unused socket to the cache. * @param sock socket not used by anyone. */ - public synchronized void put(Socket sock) { + public synchronized void put(Socket sock, IOStreamPair ioStreams) { + SocketAndStreams s = new SocketAndStreams(sock, ioStreams); if (capacity <= 0) { // Cache disabled. - IOUtils.closeSocket(sock); + s.close(); return; } @@ -102,7 +106,7 @@ class SocketCache { if (capacity == multimap.size()) { evictOldest(); } - multimap.put(remoteAddr, sock); + multimap.put(remoteAddr, new SocketAndStreams(sock, ioStreams)); } public synchronized int size() { @@ -113,23 +117,23 @@ class SocketCache { * Evict the oldest entry in the cache. */ private synchronized void evictOldest() { - Iterator> iter = + Iterator> iter = multimap.entries().iterator(); if (!iter.hasNext()) { throw new IllegalStateException("Cannot evict from empty cache!"); } - Entry entry = iter.next(); + Entry entry = iter.next(); iter.remove(); - Socket sock = entry.getValue(); - IOUtils.closeSocket(sock); + SocketAndStreams s = entry.getValue(); + s.close(); } /** * Empty the cache, and close all sockets. */ public synchronized void clear() { - for (Socket sock : multimap.values()) { - IOUtils.closeSocket(sock); + for (SocketAndStreams s : multimap.values()) { + s.close(); } multimap.clear(); } @@ -138,5 +142,25 @@ class SocketCache { protected void finalize() { clear(); } + + @InterfaceAudience.Private + static class SocketAndStreams implements Closeable { + public final Socket sock; + public final IOStreamPair ioStreams; + + public SocketAndStreams(Socket s, IOStreamPair ioStreams) { + this.sock = s; + this.ioStreams = ioStreams; + } + + @Override + public void close() { + if (ioStreams != null) { + IOUtils.closeStream(ioStreams.in); + IOUtils.closeStream(ioStreams.out); + } + IOUtils.closeSocket(sock); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java index 9a024656029..6aadaa9ad8a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java @@ -44,6 +44,7 @@ import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.KerberosInfo; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenInfo; +import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector; @@ -941,4 +942,11 @@ public interface ClientProtocol { */ public void cancelDelegationToken(Token token) throws IOException; + + /** + * @return encryption key so a client can encrypt data sent via the + * DataTransferProtocol to/from DataNodes. + * @throws IOException + */ + public DataEncryptionKey getDataEncryptionKey() throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferEncryptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferEncryptor.java new file mode 100644 index 00000000000..ce81135a40c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferEncryptor.java @@ -0,0 +1,505 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol.datatransfer; + +import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Map; +import java.util.TreeMap; + +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.CallbackHandler; +import javax.security.auth.callback.NameCallback; +import javax.security.auth.callback.PasswordCallback; +import javax.security.auth.callback.UnsupportedCallbackException; +import javax.security.sasl.AuthorizeCallback; +import javax.security.sasl.RealmCallback; +import javax.security.sasl.RealmChoiceCallback; +import javax.security.sasl.Sasl; +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslException; +import javax.security.sasl.SaslServer; + +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus; +import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager; +import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; +import org.apache.hadoop.security.SaslInputStream; +import org.apache.hadoop.security.SaslOutputStream; + +import com.google.common.collect.Maps; +import com.google.protobuf.ByteString; + +/** + * A class which, given connected input/output streams, will perform a + * handshake using those streams based on SASL to produce new Input/Output + * streams which will encrypt/decrypt all data written/read from said streams. + * Much of this is inspired by or borrowed from the TSaslTransport in Apache + * Thrift, but with some HDFS-specific tweaks. + */ +@InterfaceAudience.Private +public class DataTransferEncryptor { + + public static final Log LOG = LogFactory.getLog(DataTransferEncryptor.class); + + /** + * Sent by clients and validated by servers. We use a number that's unlikely + * to ever be sent as the value of the DATA_TRANSFER_VERSION. + */ + private static final int ENCRYPTED_TRANSFER_MAGIC_NUMBER = 0xDEADBEEF; + + /** + * Delimiter for the three-part SASL username string. + */ + private static final String NAME_DELIMITER = " "; + + // This has to be set as part of the SASL spec, but it don't matter for + // our purposes, but may not be empty. It's sent over the wire, so use + // a short string. + private static final String SERVER_NAME = "0"; + + private static final String PROTOCOL = "hdfs"; + private static final String MECHANISM = "DIGEST-MD5"; + private static final Map SASL_PROPS = new TreeMap(); + + static { + SASL_PROPS.put(Sasl.QOP, "auth-conf"); + SASL_PROPS.put(Sasl.SERVER_AUTH, "true"); + } + + /** + * Factory method for DNs, where the nonce, keyId, and encryption key are not + * yet known. The nonce and keyId will be sent by the client, and the DN + * will then use those pieces of info and the secret key shared with the NN + * to determine the encryptionKey used for the SASL handshake/encryption. + * + * Establishes a secure connection assuming that the party on the other end + * has the same shared secret. This does a SASL connection handshake, but not + * a general-purpose one. It's specific to the MD5-DIGEST SASL mechanism with + * auth-conf enabled. In particular, it doesn't support an arbitrary number of + * challenge/response rounds, and we know that the client will never have an + * initial response, so we don't check for one. + * + * @param underlyingOut output stream to write to the other party + * @param underlyingIn input stream to read from the other party + * @param blockPoolTokenSecretManager secret manager capable of constructing + * encryption key based on keyId, blockPoolId, and nonce + * @return a pair of streams which wrap the given streams and encrypt/decrypt + * all data read/written + * @throws IOException in the event of error + */ + public static IOStreamPair getEncryptedStreams( + OutputStream underlyingOut, InputStream underlyingIn, + BlockPoolTokenSecretManager blockPoolTokenSecretManager, + String encryptionAlgorithm) throws IOException { + + DataInputStream in = new DataInputStream(underlyingIn); + DataOutputStream out = new DataOutputStream(underlyingOut); + + Map saslProps = Maps.newHashMap(SASL_PROPS); + saslProps.put("com.sun.security.sasl.digest.cipher", encryptionAlgorithm); + + if (LOG.isDebugEnabled()) { + LOG.debug("Server using encryption algorithm " + encryptionAlgorithm); + } + + SaslParticipant sasl = new SaslParticipant(Sasl.createSaslServer(MECHANISM, + PROTOCOL, SERVER_NAME, saslProps, + new SaslServerCallbackHandler(blockPoolTokenSecretManager))); + + int magicNumber = in.readInt(); + if (magicNumber != ENCRYPTED_TRANSFER_MAGIC_NUMBER) { + throw new InvalidMagicNumberException(magicNumber); + } + try { + // step 1 + performSaslStep1(out, in, sasl); + + // step 2 (server-side only) + byte[] remoteResponse = readSaslMessage(in); + byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse); + sendSaslMessage(out, localResponse); + + // SASL handshake is complete + checkSaslComplete(sasl); + + return sasl.createEncryptedStreamPair(out, in); + } catch (IOException ioe) { + if (ioe instanceof SaslException && + ioe.getCause() != null && + ioe.getCause() instanceof InvalidEncryptionKeyException) { + // This could just be because the client is long-lived and hasn't gotten + // a new encryption key from the NN in a while. Upon receiving this + // error, the client will get a new encryption key from the NN and retry + // connecting to this DN. + sendInvalidKeySaslErrorMessage(out, ioe.getCause().getMessage()); + } else { + sendGenericSaslErrorMessage(out, ioe.getMessage()); + } + throw ioe; + } + } + + /** + * Factory method for clients, where the encryption token is already created. + * + * Establishes a secure connection assuming that the party on the other end + * has the same shared secret. This does a SASL connection handshake, but not + * a general-purpose one. It's specific to the MD5-DIGEST SASL mechanism with + * auth-conf enabled. In particular, it doesn't support an arbitrary number of + * challenge/response rounds, and we know that the client will never have an + * initial response, so we don't check for one. + * + * @param underlyingOut output stream to write to the other party + * @param underlyingIn input stream to read from the other party + * @param encryptionKey all info required to establish an encrypted stream + * @return a pair of streams which wrap the given streams and encrypt/decrypt + * all data read/written + * @throws IOException in the event of error + */ + public static IOStreamPair getEncryptedStreams( + OutputStream underlyingOut, InputStream underlyingIn, + DataEncryptionKey encryptionKey) + throws IOException { + + Map saslProps = Maps.newHashMap(SASL_PROPS); + saslProps.put("com.sun.security.sasl.digest.cipher", + encryptionKey.encryptionAlgorithm); + + if (LOG.isDebugEnabled()) { + LOG.debug("Client using encryption algorithm " + + encryptionKey.encryptionAlgorithm); + } + + DataOutputStream out = new DataOutputStream(underlyingOut); + DataInputStream in = new DataInputStream(underlyingIn); + + String userName = getUserNameFromEncryptionKey(encryptionKey); + SaslParticipant sasl = new SaslParticipant(Sasl.createSaslClient( + new String[] { MECHANISM }, userName, PROTOCOL, SERVER_NAME, saslProps, + new SaslClientCallbackHandler(encryptionKey.encryptionKey, userName))); + + out.writeInt(ENCRYPTED_TRANSFER_MAGIC_NUMBER); + out.flush(); + + try { + // Start of handshake - "initial response" in SASL terminology. + sendSaslMessage(out, new byte[0]); + + // step 1 + performSaslStep1(out, in, sasl); + + // step 2 (client-side only) + byte[] remoteResponse = readSaslMessage(in); + byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse); + assert localResponse == null; + + // SASL handshake is complete + checkSaslComplete(sasl); + + return sasl.createEncryptedStreamPair(out, in); + } catch (IOException ioe) { + sendGenericSaslErrorMessage(out, ioe.getMessage()); + throw ioe; + } + } + + private static void performSaslStep1(DataOutputStream out, DataInputStream in, + SaslParticipant sasl) throws IOException { + byte[] remoteResponse = readSaslMessage(in); + byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse); + sendSaslMessage(out, localResponse); + } + + private static void checkSaslComplete(SaslParticipant sasl) throws IOException { + if (!sasl.isComplete()) { + throw new IOException("Failed to complete SASL handshake"); + } + + if (!sasl.supportsConfidentiality()) { + throw new IOException("SASL handshake completed, but channel does not " + + "support encryption"); + } + } + + private static void sendSaslMessage(DataOutputStream out, byte[] payload) + throws IOException { + sendSaslMessage(out, DataTransferEncryptorStatus.SUCCESS, payload, null); + } + + private static void sendInvalidKeySaslErrorMessage(DataOutputStream out, + String message) throws IOException { + sendSaslMessage(out, DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY, null, + message); + } + + private static void sendGenericSaslErrorMessage(DataOutputStream out, + String message) throws IOException { + sendSaslMessage(out, DataTransferEncryptorStatus.ERROR, null, message); + } + + private static void sendSaslMessage(OutputStream out, + DataTransferEncryptorStatus status, byte[] payload, String message) + throws IOException { + DataTransferEncryptorMessageProto.Builder builder = + DataTransferEncryptorMessageProto.newBuilder(); + + builder.setStatus(status); + if (payload != null) { + builder.setPayload(ByteString.copyFrom(payload)); + } + if (message != null) { + builder.setMessage(message); + } + + DataTransferEncryptorMessageProto proto = builder.build(); + proto.writeDelimitedTo(out); + out.flush(); + } + + private static byte[] readSaslMessage(DataInputStream in) throws IOException { + DataTransferEncryptorMessageProto proto = + DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in)); + if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) { + throw new InvalidEncryptionKeyException(proto.getMessage()); + } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) { + throw new IOException(proto.getMessage()); + } else { + return proto.getPayload().toByteArray(); + } + } + + /** + * Set the encryption key when asked by the server-side SASL object. + */ + private static class SaslServerCallbackHandler implements CallbackHandler { + + private BlockPoolTokenSecretManager blockPoolTokenSecretManager; + + public SaslServerCallbackHandler(BlockPoolTokenSecretManager + blockPoolTokenSecretManager) { + this.blockPoolTokenSecretManager = blockPoolTokenSecretManager; + } + + @Override + public void handle(Callback[] callbacks) throws IOException, + UnsupportedCallbackException { + NameCallback nc = null; + PasswordCallback pc = null; + AuthorizeCallback ac = null; + for (Callback callback : callbacks) { + if (callback instanceof AuthorizeCallback) { + ac = (AuthorizeCallback) callback; + } else if (callback instanceof PasswordCallback) { + pc = (PasswordCallback) callback; + } else if (callback instanceof NameCallback) { + nc = (NameCallback) callback; + } else if (callback instanceof RealmCallback) { + continue; // realm is ignored + } else { + throw new UnsupportedCallbackException(callback, + "Unrecognized SASL DIGEST-MD5 Callback: " + callback); + } + } + + if (pc != null) { + byte[] encryptionKey = getEncryptionKeyFromUserName( + blockPoolTokenSecretManager, nc.getDefaultName()); + pc.setPassword(encryptionKeyToPassword(encryptionKey)); + } + + if (ac != null) { + ac.setAuthorized(true); + ac.setAuthorizedID(ac.getAuthorizationID()); + } + + } + + } + + /** + * Set the encryption key when asked by the client-side SASL object. + */ + private static class SaslClientCallbackHandler implements CallbackHandler { + + private byte[] encryptionKey; + private String userName; + + public SaslClientCallbackHandler(byte[] encryptionKey, String userName) { + this.encryptionKey = encryptionKey; + this.userName = userName; + } + + @Override + public void handle(Callback[] callbacks) throws IOException, + UnsupportedCallbackException { + NameCallback nc = null; + PasswordCallback pc = null; + RealmCallback rc = null; + for (Callback callback : callbacks) { + if (callback instanceof RealmChoiceCallback) { + continue; + } else if (callback instanceof NameCallback) { + nc = (NameCallback) callback; + } else if (callback instanceof PasswordCallback) { + pc = (PasswordCallback) callback; + } else if (callback instanceof RealmCallback) { + rc = (RealmCallback) callback; + } else { + throw new UnsupportedCallbackException(callback, + "Unrecognized SASL client callback"); + } + } + if (nc != null) { + nc.setName(userName); + } + if (pc != null) { + pc.setPassword(encryptionKeyToPassword(encryptionKey)); + } + if (rc != null) { + rc.setText(rc.getDefaultText()); + } + } + + } + + /** + * The SASL username consists of the keyId, blockPoolId, and nonce with the + * first two encoded as Strings, and the third encoded using Base64. The + * fields are each separated by a single space. + * + * @param encryptionKey the encryption key to encode as a SASL username. + * @return encoded username containing keyId, blockPoolId, and nonce + */ + private static String getUserNameFromEncryptionKey( + DataEncryptionKey encryptionKey) { + return encryptionKey.keyId + NAME_DELIMITER + + encryptionKey.blockPoolId + NAME_DELIMITER + + new String(Base64.encodeBase64(encryptionKey.nonce, false)); + } + + /** + * Given a secret manager and a username encoded as described above, determine + * the encryption key. + * + * @param blockPoolTokenSecretManager to determine the encryption key. + * @param userName containing the keyId, blockPoolId, and nonce. + * @return secret encryption key. + * @throws IOException + */ + private static byte[] getEncryptionKeyFromUserName( + BlockPoolTokenSecretManager blockPoolTokenSecretManager, String userName) + throws IOException { + String[] nameComponents = userName.split(NAME_DELIMITER); + if (nameComponents.length != 3) { + throw new IOException("Provided name '" + userName + "' has " + + nameComponents.length + " components instead of the expected 3."); + } + int keyId = Integer.parseInt(nameComponents[0]); + String blockPoolId = nameComponents[1]; + byte[] nonce = Base64.decodeBase64(nameComponents[2]); + return blockPoolTokenSecretManager.retrieveDataEncryptionKey(keyId, + blockPoolId, nonce); + } + + private static char[] encryptionKeyToPassword(byte[] encryptionKey) { + return new String(Base64.encodeBase64(encryptionKey, false)).toCharArray(); + } + + /** + * Strongly inspired by Thrift's TSaslTransport class. + * + * Used to abstract over the SaslServer and + * SaslClient classes, which share a lot of their interface, but + * unfortunately don't share a common superclass. + */ + private static class SaslParticipant { + // One of these will always be null. + public SaslServer saslServer; + public SaslClient saslClient; + + public SaslParticipant(SaslServer saslServer) { + this.saslServer = saslServer; + } + + public SaslParticipant(SaslClient saslClient) { + this.saslClient = saslClient; + } + + public byte[] evaluateChallengeOrResponse(byte[] challengeOrResponse) throws SaslException { + if (saslClient != null) { + return saslClient.evaluateChallenge(challengeOrResponse); + } else { + return saslServer.evaluateResponse(challengeOrResponse); + } + } + + public boolean isComplete() { + if (saslClient != null) + return saslClient.isComplete(); + else + return saslServer.isComplete(); + } + + public boolean supportsConfidentiality() { + String qop = null; + if (saslClient != null) { + qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP); + } else { + qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP); + } + return qop != null && qop.equals("auth-conf"); + } + + // Return some input/output streams that will henceforth have their + // communication encrypted. + private IOStreamPair createEncryptedStreamPair( + DataOutputStream out, DataInputStream in) { + if (saslClient != null) { + return new IOStreamPair( + new SaslInputStream(in, saslClient), + new SaslOutputStream(out, saslClient)); + } else { + return new IOStreamPair( + new SaslInputStream(in, saslServer), + new SaslOutputStream(out, saslServer)); + } + } + } + + @InterfaceAudience.Private + public static class InvalidMagicNumberException extends IOException { + + private static final long serialVersionUID = 1L; + + public InvalidMagicNumberException(int magicNumber) { + super(String.format("Received %x instead of %x from client.", + magicNumber, ENCRYPTED_TRANSFER_MAGIC_NUMBER)); + } + } + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/IOStreamPair.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/IOStreamPair.java new file mode 100644 index 00000000000..23407f8ab2d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/IOStreamPair.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol.datatransfer; + +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * A little struct class to wrap an InputStream and an OutputStream. + */ +@InterfaceAudience.Private +public class IOStreamPair { + public final InputStream in; + public final OutputStream out; + + public IOStreamPair(InputStream in, OutputStream out) { + this.in = in; + this.out = out; + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/InvalidEncryptionKeyException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/InvalidEncryptionKeyException.java new file mode 100644 index 00000000000..170467eb242 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/InvalidEncryptionKeyException.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol.datatransfer; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Encryption key verification failed. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class InvalidEncryptionKeyException extends IOException { + private static final long serialVersionUID = 0l; + + public InvalidEncryptionKeyException() { + super(); + } + + public InvalidEncryptionKeyException(String msg) { + super(msg); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java index 6ca9d886944..ff7a81babd7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java @@ -38,10 +38,10 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProt @InterfaceAudience.Private @InterfaceStability.Evolving public abstract class Receiver implements DataTransferProtocol { - protected final DataInputStream in; - - /** Create a receiver for DataTransferProtocol with a socket. */ - protected Receiver(final DataInputStream in) { + protected DataInputStream in; + + /** Initialize a receiver for DataTransferProtocol with a socket. */ + protected void initialize(final DataInputStream in) { this.in = in; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java index b22bcfd1b70..1a9d4e6de0c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java @@ -58,6 +58,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FsyncR import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FsyncResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdditionalDatanodeRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdditionalDatanodeResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto.Builder; @@ -127,6 +129,7 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockTokenIdentifierProt import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto; +import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport; import org.apache.hadoop.io.Text; @@ -830,4 +833,18 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements throw new ServiceException(e); } } + + @Override + public GetDataEncryptionKeyResponseProto getDataEncryptionKey( + RpcController controller, GetDataEncryptionKeyRequestProto request) + throws ServiceException { + try { + DataEncryptionKey encryptionKey = server.getDataEncryptionKey(); + return GetDataEncryptionKeyResponseProto.newBuilder() + .setDataEncryptionKey(PBHelper.convert(encryptionKey)) + .build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java index 6d1d38a696b..4f0792e9b51 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java @@ -63,6 +63,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Distri import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FinalizeUpgradeRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FsyncRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdditionalDatanodeRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetContentSummaryRequestProto; @@ -99,6 +100,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetSaf import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTimesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto; +import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport; import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException; @@ -815,9 +817,22 @@ public class ClientNamenodeProtocolTranslatorPB implements ClientNamenodeProtocolPB.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER, RPC.getProtocolVersion(ClientNamenodeProtocolPB.class), methodName); } + + @Override + public DataEncryptionKey getDataEncryptionKey() throws IOException { + GetDataEncryptionKeyRequestProto req = GetDataEncryptionKeyRequestProto + .newBuilder().build(); + try { + return PBHelper.convert(rpcProxy.getDataEncryptionKey(null, req) + .getDataEncryptionKey()); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } @Override public Object getUnderlyingProxyObject() { return rpcProxy; } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index 93fe2497e1c..44863b2b289 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -62,6 +62,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReceivedDele import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReportProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.UpgradeCommandProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DataEncryptionKeyProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockTokenIdentifierProto; @@ -96,6 +97,7 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaStateProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageInfoProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.UpgradeStatusReportProto; import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalInfoProto; +import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.block.BlockKey; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; @@ -970,12 +972,37 @@ public class PBHelper { .setIsLastBlockComplete(lb.isLastBlockComplete()).build(); } + // DataEncryptionKey + public static DataEncryptionKey convert(DataEncryptionKeyProto bet) { + String encryptionAlgorithm = bet.getEncryptionAlgorithm(); + return new DataEncryptionKey(bet.getKeyId(), + bet.getBlockPoolId(), + bet.getNonce().toByteArray(), + bet.getEncryptionKey().toByteArray(), + bet.getExpiryDate(), + encryptionAlgorithm.isEmpty() ? null : encryptionAlgorithm); + } + + public static DataEncryptionKeyProto convert(DataEncryptionKey bet) { + DataEncryptionKeyProto.Builder b = DataEncryptionKeyProto.newBuilder() + .setKeyId(bet.keyId) + .setBlockPoolId(bet.blockPoolId) + .setNonce(ByteString.copyFrom(bet.nonce)) + .setEncryptionKey(ByteString.copyFrom(bet.encryptionKey)) + .setExpiryDate(bet.expiryDate); + if (bet.encryptionAlgorithm != null) { + b.setEncryptionAlgorithm(bet.encryptionAlgorithm); + } + return b.build(); + } + public static FsServerDefaults convert(FsServerDefaultsProto fs) { if (fs == null) return null; return new FsServerDefaults( fs.getBlockSize(), fs.getBytesPerChecksum(), fs.getWritePacketSize(), (short) fs.getReplication(), - fs.getFileBufferSize()); + fs.getFileBufferSize(), + fs.getEncryptDataTransfer()); } public static FsServerDefaultsProto convert(FsServerDefaults fs) { @@ -983,7 +1010,10 @@ public class PBHelper { return FsServerDefaultsProto.newBuilder(). setBlockSize(fs.getBlockSize()). setBytesPerChecksum(fs.getBytesPerChecksum()). - setWritePacketSize(fs.getWritePacketSize()).setReplication(fs.getReplication()).setFileBufferSize(fs.getFileBufferSize()).build(); + setWritePacketSize(fs.getWritePacketSize()) + .setReplication(fs.getReplication()) + .setFileBufferSize(fs.getFileBufferSize()) + .setEncryptDataTransfer(fs.getEncryptDataTransfer()).build(); } public static FsPermissionProto convert(FsPermission p) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java index b65d073f60b..0df7067509a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java @@ -119,4 +119,13 @@ public class BlockPoolTokenSecretManager extends btsm.clearAllKeysForTesting(); } } + + public DataEncryptionKey generateDataEncryptionKey(String blockPoolId) { + return get(blockPoolId).generateDataEncryptionKey(); + } + + public byte[] retrieveDataEncryptionKey(int keyId, String blockPoolId, + byte[] nonce) throws IOException { + return get(blockPoolId).retrieveDataEncryptionKey(keyId, nonce); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java index 2b76ad5e539..954f1698589 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java @@ -32,6 +32,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.SecretManager; @@ -74,6 +75,10 @@ public class BlockTokenSecretManager extends private BlockKey currentKey; private BlockKey nextKey; private Map allKeys; + private String blockPoolId; + private String encryptionAlgorithm; + + private SecureRandom nonceGenerator = new SecureRandom(); public static enum AccessMode { READ, WRITE, COPY, REPLACE @@ -86,8 +91,9 @@ public class BlockTokenSecretManager extends * @param tokenLifetime how long an individual token is valid */ public BlockTokenSecretManager(long keyUpdateInterval, - long tokenLifetime) { - this(false, keyUpdateInterval, tokenLifetime); + long tokenLifetime, String blockPoolId, String encryptionAlgorithm) { + this(false, keyUpdateInterval, tokenLifetime, blockPoolId, + encryptionAlgorithm); } /** @@ -100,8 +106,10 @@ public class BlockTokenSecretManager extends * @param otherNnId the NN ID of the other NN in an HA setup */ public BlockTokenSecretManager(long keyUpdateInterval, - long tokenLifetime, int nnIndex) { - this(true, keyUpdateInterval, tokenLifetime); + long tokenLifetime, int nnIndex, String blockPoolId, + String encryptionAlgorithm) { + this(true, keyUpdateInterval, tokenLifetime, blockPoolId, + encryptionAlgorithm); Preconditions.checkArgument(nnIndex == 0 || nnIndex == 1); this.nnIndex = nnIndex; setSerialNo(new SecureRandom().nextInt()); @@ -109,17 +117,24 @@ public class BlockTokenSecretManager extends } private BlockTokenSecretManager(boolean isMaster, long keyUpdateInterval, - long tokenLifetime) { + long tokenLifetime, String blockPoolId, String encryptionAlgorithm) { this.isMaster = isMaster; this.keyUpdateInterval = keyUpdateInterval; this.tokenLifetime = tokenLifetime; this.allKeys = new HashMap(); + this.blockPoolId = blockPoolId; + this.encryptionAlgorithm = encryptionAlgorithm; + generateKeys(); } @VisibleForTesting public synchronized void setSerialNo(int serialNo) { this.serialNo = (serialNo & LOW_MASK) | (nnIndex << 31); } + + public void setBlockPoolId(String blockPoolId) { + this.blockPoolId = blockPoolId; + } /** Initialize block keys */ private synchronized void generateKeys() { @@ -371,6 +386,49 @@ public class BlockTokenSecretManager extends return createPassword(identifier.getBytes(), key.getKey()); } + /** + * Generate a data encryption key for this block pool, using the current + * BlockKey. + * + * @return a data encryption key which may be used to encrypt traffic + * over the DataTransferProtocol + */ + public DataEncryptionKey generateDataEncryptionKey() { + byte[] nonce = new byte[8]; + nonceGenerator.nextBytes(nonce); + BlockKey key = null; + synchronized (this) { + key = currentKey; + } + byte[] encryptionKey = createPassword(nonce, key.getKey()); + return new DataEncryptionKey(key.getKeyId(), blockPoolId, nonce, + encryptionKey, Time.now() + tokenLifetime, + encryptionAlgorithm); + } + + /** + * Recreate an encryption key based on the given key id and nonce. + * + * @param keyId identifier of the secret key used to generate the encryption key. + * @param nonce random value used to create the encryption key + * @return the encryption key which corresponds to this (keyId, blockPoolId, nonce) + * @throws InvalidToken + * @throws InvalidEncryptionKeyException + */ + public byte[] retrieveDataEncryptionKey(int keyId, byte[] nonce) + throws InvalidEncryptionKeyException { + BlockKey key = null; + synchronized (this) { + key = allKeys.get(keyId); + if (key == null) { + throw new InvalidEncryptionKeyException("Can't re-compute encryption key" + + " for nonce, since the required block key (keyID=" + keyId + + ") doesn't exist. Current key: " + currentKey.getKeyId()); + } + } + return createPassword(nonce, key.getKey()); + } + @VisibleForTesting public synchronized void setKeyUpdateIntervalForTesting(long millis) { this.keyUpdateInterval = millis; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/DataEncryptionKey.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/DataEncryptionKey.java new file mode 100644 index 00000000000..41c84e2e4d6 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/DataEncryptionKey.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.security.token.block; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * A little struct class to contain all fields required to perform encryption of + * the DataTransferProtocol. + */ +@InterfaceAudience.Private +public class DataEncryptionKey { + public final int keyId; + public final String blockPoolId; + public final byte[] nonce; + public final byte[] encryptionKey; + public final long expiryDate; + public final String encryptionAlgorithm; + + public DataEncryptionKey(int keyId, String blockPoolId, byte[] nonce, + byte[] encryptionKey, long expiryDate, String encryptionAlgorithm) { + this.keyId = keyId; + this.blockPoolId = blockPoolId; + this.nonce = nonce; + this.encryptionKey = encryptionKey; + this.expiryDate = expiryDate; + this.encryptionAlgorithm = encryptionAlgorithm; + } + + @Override + public String toString() { + return keyId + "/" + blockPoolId + "/" + nonce.length + "/" + + encryptionKey.length; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java index aa9576b4f56..26a0c621720 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java @@ -24,6 +24,8 @@ import java.io.BufferedOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.net.Socket; import java.net.URI; import java.text.DateFormat; @@ -57,6 +59,8 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; @@ -311,11 +315,22 @@ public class Balancer { NetUtils.createSocketAddr(target.datanode.getXferAddr()), HdfsServerConstants.READ_TIMEOUT); sock.setKeepAlive(true); - out = new DataOutputStream( new BufferedOutputStream( - sock.getOutputStream(), HdfsConstants.IO_FILE_BUFFER_SIZE)); + + OutputStream unbufOut = sock.getOutputStream(); + InputStream unbufIn = sock.getInputStream(); + if (nnc.getDataEncryptionKey() != null) { + IOStreamPair encryptedStreams = + DataTransferEncryptor.getEncryptedStreams( + unbufOut, unbufIn, nnc.getDataEncryptionKey()); + unbufOut = encryptedStreams.out; + unbufIn = encryptedStreams.in; + } + out = new DataOutputStream(new BufferedOutputStream(unbufOut, + HdfsConstants.IO_FILE_BUFFER_SIZE)); + in = new DataInputStream(new BufferedInputStream(unbufIn, + HdfsConstants.IO_FILE_BUFFER_SIZE)); + sendRequest(out); - in = new DataInputStream( new BufferedInputStream( - sock.getInputStream(), HdfsConstants.IO_FILE_BUFFER_SIZE)); receiveResponse(in); bytesMoved.inc(block.getNumBytes()); LOG.info( "Moving block " + block.getBlock().getBlockId() + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java index 7d0d373a206..13709458aab 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java @@ -29,10 +29,12 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; @@ -60,10 +62,12 @@ class NameNodeConnector { final OutputStream out; private final boolean isBlockTokenEnabled; + private final boolean encryptDataTransfer; private boolean shouldRun; private long keyUpdaterInterval; private BlockTokenSecretManager blockTokenSecretManager; private Daemon keyupdaterthread; // AccessKeyUpdater thread + private DataEncryptionKey encryptionKey; NameNodeConnector(URI nameNodeUri, Configuration conf) throws IOException { @@ -88,8 +92,11 @@ class NameNodeConnector { LOG.info("Block token params received from NN: keyUpdateInterval=" + blockKeyUpdateInterval / (60 * 1000) + " min(s), tokenLifetime=" + blockTokenLifetime / (60 * 1000) + " min(s)"); + String encryptionAlgorithm = conf.get( + DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY); this.blockTokenSecretManager = new BlockTokenSecretManager( - blockKeyUpdateInterval, blockTokenLifetime); + blockKeyUpdateInterval, blockTokenLifetime, blockpoolID, + encryptionAlgorithm); this.blockTokenSecretManager.addKeys(keys); /* * Balancer should sync its block keys with NN more frequently than NN @@ -102,7 +109,8 @@ class NameNodeConnector { this.shouldRun = true; this.keyupdaterthread.start(); } - + this.encryptDataTransfer = fs.getServerDefaults(new Path("/")) + .getEncryptDataTransfer(); // Check if there is another balancer running. // Exit if there is another one running. out = checkAndMarkRunningBalancer(); @@ -126,6 +134,20 @@ class NameNodeConnector { BlockTokenSecretManager.AccessMode.COPY)); } } + + DataEncryptionKey getDataEncryptionKey() + throws IOException { + if (encryptDataTransfer) { + synchronized (this) { + if (encryptionKey == null) { + encryptionKey = blockTokenSecretManager.generateDataEncryptionKey(); + } + return encryptionKey; + } + } else { + return null; + } + } /* The idea for making sure that there is no more than one balancer * running in an HDFS is to create a file in the HDFS, writes the IP address @@ -208,4 +230,4 @@ class NameNodeConnector { } } } -} \ No newline at end of file +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 3af15f84ffb..a7e96f48337 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; +import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; @@ -206,6 +207,9 @@ public class BlockManager { /** variable to enable check for enough racks */ final boolean shouldCheckForEnoughRacks; + + // whether or not to issue block encryption keys. + final boolean encryptDataTransfer; /** * When running inside a Standby node, the node may receive block reports @@ -286,12 +290,18 @@ public class BlockManager { this.replicationRecheckInterval = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000L; + + this.encryptDataTransfer = + conf.getBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY, + DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT); + LOG.info("defaultReplication = " + defaultReplication); LOG.info("maxReplication = " + maxReplication); LOG.info("minReplication = " + minReplication); LOG.info("maxReplicationStreams = " + maxReplicationStreams); LOG.info("shouldCheckForEnoughRacks = " + shouldCheckForEnoughRacks); LOG.info("replicationRecheckInterval = " + replicationRecheckInterval); + LOG.info("encryptDataTransfer = " + encryptDataTransfer); } private static BlockTokenSecretManager createBlockTokenSecretManager( @@ -311,10 +321,14 @@ public class BlockManager { final long lifetimeMin = conf.getLong( DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_LIFETIME_KEY, DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_LIFETIME_DEFAULT); + final String encryptionAlgorithm = conf.get( + DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY); LOG.info(DFSConfigKeys.DFS_BLOCK_ACCESS_KEY_UPDATE_INTERVAL_KEY + "=" + updateMin + " min(s), " + DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_LIFETIME_KEY - + "=" + lifetimeMin + " min(s)"); + + "=" + lifetimeMin + " min(s), " + + DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY + + "=" + encryptionAlgorithm); String nsId = DFSUtil.getNamenodeNameServiceId(conf); boolean isHaEnabled = HAUtil.isHAEnabled(conf, nsId); @@ -323,10 +337,17 @@ public class BlockManager { String thisNnId = HAUtil.getNameNodeId(conf, nsId); String otherNnId = HAUtil.getNameNodeIdOfOtherNode(conf, nsId); return new BlockTokenSecretManager(updateMin*60*1000L, - lifetimeMin*60*1000L, thisNnId.compareTo(otherNnId) < 0 ? 0 : 1); + lifetimeMin*60*1000L, thisNnId.compareTo(otherNnId) < 0 ? 0 : 1, null, + encryptionAlgorithm); } else { return new BlockTokenSecretManager(updateMin*60*1000L, - lifetimeMin*60*1000L, 0); + lifetimeMin*60*1000L, 0, null, encryptionAlgorithm); + } + } + + public void setBlockPoolId(String blockPoolId) { + if (isBlockTokenEnabled()) { + blockTokenSecretManager.setBlockPoolId(blockPoolId); } } @@ -793,6 +814,14 @@ public class BlockManager { nodeinfo.needKeyUpdate = false; } } + + public DataEncryptionKey generateDataEncryptionKey() { + if (isBlockTokenEnabled() && encryptDataTransfer) { + return blockTokenSecretManager.generateDataEncryptionKey(); + } else { + return null; + } + } /** * Clamp the specified replication between the minimum and the maximum diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java index cce5f74d83c..60a1216d120 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; @@ -195,7 +196,8 @@ public class JspHelper { public static void streamBlockInAscii(InetSocketAddress addr, String poolId, long blockId, Token blockToken, long genStamp, long blockSize, long offsetIntoBlock, long chunkSizeToView, - JspWriter out, Configuration conf) throws IOException { + JspWriter out, Configuration conf, DataEncryptionKey encryptionKey) + throws IOException { if (chunkSizeToView == 0) return; Socket s = NetUtils.getDefaultSocketFactory(conf).createSocket(); s.connect(addr, HdfsServerConstants.READ_TIMEOUT); @@ -208,7 +210,7 @@ public class JspHelper { BlockReader blockReader = BlockReaderFactory.newBlockReader( conf, s, file, new ExtendedBlock(poolId, blockId, 0, genStamp), blockToken, - offsetIntoBlock, amtToRead); + offsetIntoBlock, amtToRead, encryptionKey); byte[] buf = new byte[(int)amtToRead]; int readOffset = 0; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java index 92f1edc2fca..3f37a7eea38 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java @@ -33,7 +33,9 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAUL import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_DEFAULT; - +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; @@ -52,6 +54,7 @@ class DNConf { final boolean syncBehindWrites; final boolean dropCacheBehindReads; final boolean syncOnClose; + final boolean encryptDataTransfer; final long readaheadLength; @@ -62,6 +65,7 @@ class DNConf { final int writePacketSize; final String minimumNameNodeVersion; + final String encryptionAlgorithm; public DNConf(Configuration conf) { socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, @@ -117,6 +121,10 @@ class DNConf { this.minimumNameNodeVersion = conf.get(DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_KEY, DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_DEFAULT); + + this.encryptDataTransfer = conf.getBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, + DFS_ENCRYPT_DATA_TRANSFER_DEFAULT); + this.encryptionAlgorithm = conf.get(DFS_DATA_ENCRYPTION_ALGORITHM_KEY); } // We get minimumNameNodeVersion via a method so it can be mocked out in tests. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 25298fd5faa..0ebde007542 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -53,6 +53,7 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.File; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.ServerSocket; @@ -100,6 +101,8 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; +import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ClientDatanodeProtocolService; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DNTransferAckProto; @@ -737,8 +740,6 @@ public class DataNode extends Configured + " tokens, or none may be."); } } - // TODO should we check that all federated nns are either enabled or - // disabled? if (!isBlockTokenEnabled) return; if (!blockPoolTokenSecretManager.isBlockPoolRegistered(blockPoolId)) { @@ -750,7 +751,8 @@ public class DataNode extends Configured + " min(s), tokenLifetime=" + blockTokenLifetime / (60 * 1000) + " min(s)"); final BlockTokenSecretManager secretMgr = - new BlockTokenSecretManager(0, blockTokenLifetime); + new BlockTokenSecretManager(0, blockTokenLifetime, blockPoolId, + dnConf.encryptionAlgorithm); blockPoolTokenSecretManager.addBlockPool(blockPoolId, secretMgr); } } @@ -1390,9 +1392,21 @@ public class DataNode extends Configured long writeTimeout = dnConf.socketWriteTimeout + HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1); - OutputStream baseStream = NetUtils.getOutputStream(sock, writeTimeout); - out = new DataOutputStream(new BufferedOutputStream(baseStream, + OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout); + InputStream unbufIn = NetUtils.getInputStream(sock); + if (dnConf.encryptDataTransfer) { + IOStreamPair encryptedStreams = + DataTransferEncryptor.getEncryptedStreams( + unbufOut, unbufIn, + blockPoolTokenSecretManager.generateDataEncryptionKey( + b.getBlockPoolId())); + unbufOut = encryptedStreams.out; + unbufIn = encryptedStreams.in; + } + + out = new DataOutputStream(new BufferedOutputStream(unbufOut, HdfsConstants.SMALL_BUFFER_SIZE)); + in = new DataInputStream(unbufIn); blockSender = new BlockSender(b, 0, b.getNumBytes(), false, false, DataNode.this, null); DatanodeInfo srcNode = new DatanodeInfo(bpReg); @@ -1410,7 +1424,7 @@ public class DataNode extends Configured stage, 0, 0, 0, 0, blockSender.getChecksum()); // send data & checksum - blockSender.sendBlock(out, baseStream, null); + blockSender.sendBlock(out, unbufOut, null); // no response necessary LOG.info(getClass().getSimpleName() + ": Transmitted " + b @@ -1418,7 +1432,6 @@ public class DataNode extends Configured // read ack if (isClient) { - in = new DataInputStream(NetUtils.getInputStream(sock)); DNTransferAckProto closeAck = DNTransferAckProto.parseFrom( HdfsProtoUtil.vintPrefixed(in)); if (LOG.isDebugEnabled()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index 9870bc2e0c4..8547df94873 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -29,6 +29,7 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.EOFException; import java.io.IOException; +import java.io.InputStream; import java.io.InterruptedIOException; import java.io.OutputStream; import java.net.InetSocketAddress; @@ -43,7 +44,10 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil; import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; +import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor.InvalidMagicNumberException; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; +import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.hdfs.protocol.datatransfer.Op; import org.apache.hadoop.hdfs.protocol.datatransfer.Receiver; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; @@ -84,7 +88,8 @@ class DataXceiver extends Receiver implements Runnable { private final DataXceiverServer dataXceiverServer; private long opStartTime; //the start time of receiving an Op - private final SocketInputWrapper socketInputWrapper; + private final SocketInputWrapper socketIn; + private OutputStream socketOut; /** * Client Name used in previous operation. Not available on first request @@ -94,23 +99,19 @@ class DataXceiver extends Receiver implements Runnable { public static DataXceiver create(Socket s, DataNode dn, DataXceiverServer dataXceiverServer) throws IOException { - - SocketInputWrapper iw = NetUtils.getInputStream(s); - return new DataXceiver(s, iw, dn, dataXceiverServer); + return new DataXceiver(s, dn, dataXceiverServer); } private DataXceiver(Socket s, - SocketInputWrapper socketInput, DataNode datanode, DataXceiverServer dataXceiverServer) throws IOException { - super(new DataInputStream(new BufferedInputStream( - socketInput, HdfsConstants.SMALL_BUFFER_SIZE))); this.s = s; - this.socketInputWrapper = socketInput; + this.dnConf = datanode.getDnConf(); + this.socketIn = NetUtils.getInputStream(s); + this.socketOut = NetUtils.getOutputStream(s, dnConf.socketWriteTimeout); this.isLocal = s.getInetAddress().equals(s.getLocalAddress()); this.datanode = datanode; - this.dnConf = datanode.getDnConf(); this.dataXceiverServer = dataXceiverServer; remoteAddress = s.getRemoteSocketAddress().toString(); localAddress = s.getLocalSocketAddress().toString(); @@ -141,6 +142,10 @@ class DataXceiver extends Receiver implements Runnable { /** Return the datanode object. */ DataNode getDataNode() {return datanode;} + + private OutputStream getOutputStream() throws IOException { + return socketOut; + } /** * Read/write data from/to the DataXceiverServer. @@ -149,8 +154,31 @@ class DataXceiver extends Receiver implements Runnable { public void run() { int opsProcessed = 0; Op op = null; + dataXceiverServer.childSockets.add(s); + try { + + InputStream input = socketIn; + if (dnConf.encryptDataTransfer) { + IOStreamPair encryptedStreams = null; + try { + encryptedStreams = DataTransferEncryptor.getEncryptedStreams(socketOut, + socketIn, datanode.blockPoolTokenSecretManager, + dnConf.encryptionAlgorithm); + } catch (InvalidMagicNumberException imne) { + LOG.info("Failed to read expected encryption handshake from client " + + "at " + s.getInetAddress() + ". Perhaps the client is running an " + + "older version of Hadoop which does not support encryption."); + return; + } + input = encryptedStreams.in; + socketOut = encryptedStreams.out; + } + input = new BufferedInputStream(input, HdfsConstants.SMALL_BUFFER_SIZE); + + super.initialize(new DataInputStream(input)); + // We process requests in a loop, and stay around for a short timeout. // This optimistic behaviour allows the other end to reuse connections. // Setting keepalive timeout to 0 disable this behavior. @@ -160,9 +188,9 @@ class DataXceiver extends Receiver implements Runnable { try { if (opsProcessed != 0) { assert dnConf.socketKeepaliveTimeout > 0; - socketInputWrapper.setTimeout(dnConf.socketKeepaliveTimeout); + socketIn.setTimeout(dnConf.socketKeepaliveTimeout); } else { - socketInputWrapper.setTimeout(dnConf.socketTimeout); + socketIn.setTimeout(dnConf.socketTimeout); } op = readOp(); } catch (InterruptedIOException ignored) { @@ -215,8 +243,7 @@ class DataXceiver extends Receiver implements Runnable { final long length) throws IOException { previousOpClientName = clientName; - OutputStream baseStream = NetUtils.getOutputStream(s, - dnConf.socketWriteTimeout); + OutputStream baseStream = getOutputStream(); DataOutputStream out = new DataOutputStream(new BufferedOutputStream( baseStream, HdfsConstants.SMALL_BUFFER_SIZE)); checkAccess(out, true, block, blockToken, @@ -242,13 +269,12 @@ class DataXceiver extends Receiver implements Runnable { } catch(IOException e) { String msg = "opReadBlock " + block + " received exception " + e; LOG.info(msg); - sendResponse(s, ERROR, msg, dnConf.socketWriteTimeout); + sendResponse(ERROR, msg); throw e; } // send op status - writeSuccessWithChecksumInfo(blockSender, - getStreamWithTimeout(s, dnConf.socketWriteTimeout)); + writeSuccessWithChecksumInfo(blockSender, new DataOutputStream(getOutputStream())); long read = blockSender.sendBlock(out, baseStream, null); // send data @@ -347,7 +373,7 @@ class DataXceiver extends Receiver implements Runnable { // reply to upstream datanode or client final DataOutputStream replyOut = new DataOutputStream( new BufferedOutputStream( - NetUtils.getOutputStream(s, dnConf.socketWriteTimeout), + getOutputStream(), HdfsConstants.SMALL_BUFFER_SIZE)); checkAccess(replyOut, isClient, block, blockToken, Op.WRITE_BLOCK, BlockTokenSecretManager.AccessMode.WRITE); @@ -389,11 +415,23 @@ class DataXceiver extends Receiver implements Runnable { NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue); mirrorSock.setSoTimeout(timeoutValue); mirrorSock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE); - mirrorOut = new DataOutputStream( - new BufferedOutputStream( - NetUtils.getOutputStream(mirrorSock, writeTimeout), - HdfsConstants.SMALL_BUFFER_SIZE)); - mirrorIn = new DataInputStream(NetUtils.getInputStream(mirrorSock)); + + OutputStream unbufMirrorOut = NetUtils.getOutputStream(mirrorSock, + writeTimeout); + InputStream unbufMirrorIn = NetUtils.getInputStream(mirrorSock); + if (dnConf.encryptDataTransfer) { + IOStreamPair encryptedStreams = + DataTransferEncryptor.getEncryptedStreams( + unbufMirrorOut, unbufMirrorIn, + datanode.blockPoolTokenSecretManager + .generateDataEncryptionKey(block.getBlockPoolId())); + + unbufMirrorOut = encryptedStreams.out; + unbufMirrorIn = encryptedStreams.in; + } + mirrorOut = new DataOutputStream(new BufferedOutputStream(unbufMirrorOut, + HdfsConstants.SMALL_BUFFER_SIZE)); + mirrorIn = new DataInputStream(unbufMirrorIn); new Sender(mirrorOut).writeBlock(originalBlock, blockToken, clientname, targets, srcDataNode, stage, pipelineSize, @@ -520,7 +558,7 @@ class DataXceiver extends Receiver implements Runnable { updateCurrentThreadName(Op.TRANSFER_BLOCK + " " + blk); final DataOutputStream out = new DataOutputStream( - NetUtils.getOutputStream(s, dnConf.socketWriteTimeout)); + getOutputStream()); try { datanode.transferReplicaForPipelineRecovery(blk, targets, clientName); writeResponse(Status.SUCCESS, null, out); @@ -533,7 +571,7 @@ class DataXceiver extends Receiver implements Runnable { public void blockChecksum(final ExtendedBlock block, final Token blockToken) throws IOException { final DataOutputStream out = new DataOutputStream( - NetUtils.getOutputStream(s, dnConf.socketWriteTimeout)); + getOutputStream()); checkAccess(out, true, block, blockToken, Op.BLOCK_CHECKSUM, BlockTokenSecretManager.AccessMode.READ); updateCurrentThreadName("Reading metadata for block " + block); @@ -593,7 +631,7 @@ class DataXceiver extends Receiver implements Runnable { LOG.warn("Invalid access token in request from " + remoteAddress + " for OP_COPY_BLOCK for block " + block + " : " + e.getLocalizedMessage()); - sendResponse(s, ERROR_ACCESS_TOKEN, "Invalid access token", dnConf.socketWriteTimeout); + sendResponse(ERROR_ACCESS_TOKEN, "Invalid access token"); return; } @@ -603,7 +641,7 @@ class DataXceiver extends Receiver implements Runnable { String msg = "Not able to copy block " + block.getBlockId() + " to " + s.getRemoteSocketAddress() + " because threads quota is exceeded."; LOG.info(msg); - sendResponse(s, ERROR, msg, dnConf.socketWriteTimeout); + sendResponse(ERROR, msg); return; } @@ -617,8 +655,7 @@ class DataXceiver extends Receiver implements Runnable { null); // set up response stream - OutputStream baseStream = NetUtils.getOutputStream( - s, dnConf.socketWriteTimeout); + OutputStream baseStream = getOutputStream(); reply = new DataOutputStream(new BufferedOutputStream( baseStream, HdfsConstants.SMALL_BUFFER_SIZE)); @@ -670,8 +707,7 @@ class DataXceiver extends Receiver implements Runnable { LOG.warn("Invalid access token in request from " + remoteAddress + " for OP_REPLACE_BLOCK for block " + block + " : " + e.getLocalizedMessage()); - sendResponse(s, ERROR_ACCESS_TOKEN, "Invalid access token", - dnConf.socketWriteTimeout); + sendResponse(ERROR_ACCESS_TOKEN, "Invalid access token"); return; } } @@ -680,7 +716,7 @@ class DataXceiver extends Receiver implements Runnable { String msg = "Not able to receive block " + block.getBlockId() + " from " + s.getRemoteSocketAddress() + " because threads quota is exceeded."; LOG.warn(msg); - sendResponse(s, ERROR, msg, dnConf.socketWriteTimeout); + sendResponse(ERROR, msg); return; } @@ -699,17 +735,29 @@ class DataXceiver extends Receiver implements Runnable { NetUtils.connect(proxySock, proxyAddr, dnConf.socketTimeout); proxySock.setSoTimeout(dnConf.socketTimeout); - OutputStream baseStream = NetUtils.getOutputStream(proxySock, + OutputStream unbufProxyOut = NetUtils.getOutputStream(proxySock, dnConf.socketWriteTimeout); - proxyOut = new DataOutputStream(new BufferedOutputStream(baseStream, + InputStream unbufProxyIn = NetUtils.getInputStream(proxySock); + if (dnConf.encryptDataTransfer) { + IOStreamPair encryptedStreams = + DataTransferEncryptor.getEncryptedStreams( + unbufProxyOut, unbufProxyIn, + datanode.blockPoolTokenSecretManager + .generateDataEncryptionKey(block.getBlockPoolId())); + unbufProxyOut = encryptedStreams.out; + unbufProxyIn = encryptedStreams.in; + } + + proxyOut = new DataOutputStream(new BufferedOutputStream(unbufProxyOut, HdfsConstants.SMALL_BUFFER_SIZE)); + proxyReply = new DataInputStream(new BufferedInputStream(unbufProxyIn, + HdfsConstants.IO_FILE_BUFFER_SIZE)); /* send request to the proxy */ new Sender(proxyOut).copyBlock(block, blockToken); // receive the response from the proxy - proxyReply = new DataInputStream(new BufferedInputStream( - NetUtils.getInputStream(proxySock), HdfsConstants.IO_FILE_BUFFER_SIZE)); + BlockOpResponseProto copyResponse = BlockOpResponseProto.parseFrom( HdfsProtoUtil.vintPrefixed(proxyReply)); @@ -762,7 +810,7 @@ class DataXceiver extends Receiver implements Runnable { // send response back try { - sendResponse(s, opStatus, errMsg, dnConf.socketWriteTimeout); + sendResponse(opStatus, errMsg); } catch (IOException ioe) { LOG.warn("Error writing reply back to " + s.getRemoteSocketAddress()); } @@ -781,20 +829,13 @@ class DataXceiver extends Receiver implements Runnable { /** * Utility function for sending a response. - * @param s socket to write to + * * @param opStatus status message to write - * @param timeout send timeout - **/ - private static void sendResponse(Socket s, Status status, String message, - long timeout) throws IOException { - DataOutputStream reply = getStreamWithTimeout(s, timeout); - - writeResponse(status, message, reply); - } - - private static DataOutputStream getStreamWithTimeout(Socket s, long timeout) - throws IOException { - return new DataOutputStream(NetUtils.getOutputStream(s, timeout)); + * @param message message to send to the client or other DN + */ + private void sendResponse(Status status, + String message) throws IOException { + writeResponse(status, message, getOutputStream()); } private static void writeResponse(Status status, String message, OutputStream out) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java index d41c1d0107b..e09440efe50 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java @@ -606,7 +606,7 @@ public class DatanodeJspHelper { try { JspHelper.streamBlockInAscii(new InetSocketAddress(req.getServerName(), datanodePort), bpid, blockId, blockToken, genStamp, blockSize, - startOffset, chunkSizeToView, out, conf); + startOffset, chunkSizeToView, out, conf, dfs.getDataEncryptionKey()); } catch (Exception e) { out.print(e); } @@ -699,7 +699,7 @@ public class DatanodeJspHelper { out.print(""); dfs.close(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 94cdecace4a..8e5915c1b65 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -25,6 +25,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAUL import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_STANDBY_CHECKPOINTS_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_STANDBY_CHECKPOINTS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY; @@ -461,7 +463,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats, conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY, DFS_BYTES_PER_CHECKSUM_DEFAULT), conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY, DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT), (short) conf.getInt(DFS_REPLICATION_KEY, DFS_REPLICATION_DEFAULT), - conf.getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT)); + conf.getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT), + conf.getBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, DFS_ENCRYPT_DATA_TRANSFER_DEFAULT)); this.maxFsObjects = conf.getLong(DFS_NAMENODE_MAX_OBJECTS_KEY, DFS_NAMENODE_MAX_OBJECTS_DEFAULT); @@ -2016,6 +2019,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, void setBlockPoolId(String bpid) { blockPoolId = bpid; + blockManager.setBlockPoolId(blockPoolId); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java index a8c1edae417..39f9094e684 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java @@ -127,7 +127,7 @@ public class FileChecksumServlets { datanode, conf, getUGI(request, conf)); final ClientProtocol nnproxy = dfs.getNamenode(); final MD5MD5CRC32FileChecksum checksum = DFSClient.getFileChecksum( - path, nnproxy, socketFactory, socketTimeout); + path, nnproxy, socketFactory, socketTimeout, dfs.getDataEncryptionKey()); MD5MD5CRC32FileChecksum.write(xml, checksum); } catch(IOException ioe) { writeXml(ioe, path, xml); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 7f5a5ad2f62..7678cd1d081 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -84,6 +84,7 @@ import org.apache.hadoop.hdfs.protocolPB.RefreshAuthorizationPolicyProtocolPB; import org.apache.hadoop.hdfs.protocolPB.RefreshAuthorizationPolicyProtocolServerSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.RefreshUserMappingsProtocolPB; import org.apache.hadoop.hdfs.protocolPB.RefreshUserMappingsProtocolServerSideTranslatorPB; +import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; @@ -1048,4 +1049,9 @@ class NameNodeRpcServer implements NamenodeProtocols { } return clientMachine; } + + @Override + public DataEncryptionKey getDataEncryptionKey() throws IOException { + return namesystem.getBlockManager().generateDataEncryptionKey(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java index 2628b203a52..90542124076 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java @@ -560,7 +560,8 @@ public class NamenodeFsck { block.getBlockId()); blockReader = BlockReaderFactory.newBlockReader( conf, s, file, block, lblock - .getBlockToken(), 0, -1); + .getBlockToken(), 0, -1, + namenode.getRpcServer().getDataEncryptionKey()); } catch (IOException ex) { // Put chosen node into dead list, continue diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto index 10f39eaa136..4fd4e2674ab 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto @@ -441,6 +441,12 @@ message SetBalancerBandwidthRequestProto { message SetBalancerBandwidthResponseProto { // void response } +message GetDataEncryptionKeyRequestProto { // no parameters +} + +message GetDataEncryptionKeyResponseProto { + required DataEncryptionKeyProto dataEncryptionKey = 1; +} service ClientNamenodeProtocol { rpc getBlockLocations(GetBlockLocationsRequestProto) @@ -511,6 +517,8 @@ service ClientNamenodeProtocol { returns(RenewDelegationTokenResponseProto); rpc cancelDelegationToken(CancelDelegationTokenRequestProto) returns(CancelDelegationTokenResponseProto); - rpc setBalancerBandwidth(SetBalancerBandwidthRequestProto) + rpc setBalancerBandwidth(SetBalancerBandwidthRequestProto) returns(SetBalancerBandwidthResponseProto); + rpc getDataEncryptionKey(GetDataEncryptionKeyRequestProto) + returns(GetDataEncryptionKeyResponseProto); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto index ef67cbd10d9..1d6d8508b0f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto @@ -25,6 +25,17 @@ option java_generate_equals_and_hash = true; import "hdfs.proto"; +message DataTransferEncryptorMessageProto { + enum DataTransferEncryptorStatus { + SUCCESS = 0; + ERROR_UNKNOWN_KEY = 1; + ERROR = 2; + } + required DataTransferEncryptorStatus status = 1; + optional bytes payload = 2; + optional string message = 3; +} + message BaseHeaderProto { required ExtendedBlockProto block = 1; optional BlockTokenIdentifierProto token = 2; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto index 4c4bdb5905e..a640ddaf49d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto @@ -126,7 +126,16 @@ message LocatedBlockProto { // their locations are not part of this object required BlockTokenIdentifierProto blockToken = 5; - } +} + +message DataEncryptionKeyProto { + required uint32 keyId = 1; + required string blockPoolId = 2; + required bytes nonce = 3; + required bytes encryptionKey = 4; + required uint64 expiryDate = 5; + optional string encryptionAlgorithm = 6; +} /** @@ -178,6 +187,7 @@ message FsServerDefaultsProto { required uint32 writePacketSize = 3; required uint32 replication = 4; // Actually a short - only 16 bits used required uint32 fileBufferSize = 5; + optional bool encryptDataTransfer = 6 [default = false]; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index fe6cbd57bb8..95e2f8c0814 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -1014,4 +1014,25 @@ + + dfs.encrypt.data.transfer + false + + Whether or not actual block data that is read/written from/to HDFS should + be encrypted on the wire. This only needs to be set on the NN and DNs, + clients will deduce this automatically. + + + + + dfs.encrypt.data.transfer.algorithm + + + This value may be set to either "3des" or "rc4". If nothing is set, then + the configured JCE default on the system is used (usually 3DES.) It is + widely believed that 3DES is more cryptographically secure, but RC4 is + substantially faster. + + + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java index 24cac94f6e5..29d8063426e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java @@ -155,7 +155,7 @@ public class BlockReaderTestUtil { testBlock.getBlockToken(), offset, lenToRead, conf.getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096), - true, ""); + true, "", null, null); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java index e7a1e14ddaf..8dd3d6fd38a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java @@ -60,7 +60,7 @@ public class TestClientBlockVerification { RemoteBlockReader2 reader = (RemoteBlockReader2)spy( util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024)); util.readAndCheckEOS(reader, FILE_SIZE_K * 1024, true); - verify(reader).sendReadResult(reader.dnSock, Status.CHECKSUM_OK); + verify(reader).sendReadResult(Status.CHECKSUM_OK); reader.close(); } @@ -75,7 +75,7 @@ public class TestClientBlockVerification { // We asked the blockreader for the whole file, and only read // half of it, so no CHECKSUM_OK - verify(reader, never()).sendReadResult(reader.dnSock, Status.CHECKSUM_OK); + verify(reader, never()).sendReadResult(Status.CHECKSUM_OK); reader.close(); } @@ -91,7 +91,7 @@ public class TestClientBlockVerification { util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024 / 2)); // And read half the file util.readAndCheckEOS(reader, FILE_SIZE_K * 1024 / 2, true); - verify(reader).sendReadResult(reader.dnSock, Status.CHECKSUM_OK); + verify(reader).sendReadResult(Status.CHECKSUM_OK); reader.close(); } @@ -110,7 +110,7 @@ public class TestClientBlockVerification { RemoteBlockReader2 reader = (RemoteBlockReader2)spy( util.getBlockReader(testBlock, startOffset, length)); util.readAndCheckEOS(reader, length, true); - verify(reader).sendReadResult(reader.dnSock, Status.CHECKSUM_OK); + verify(reader).sendReadResult(Status.CHECKSUM_OK); reader.close(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java index 354a17af9b1..327aa8fa22d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java @@ -168,13 +168,13 @@ public class TestConnCache { // Insert a socket to the NN Socket nnSock = new Socket(nnAddr.getAddress(), nnAddr.getPort()); - cache.put(nnSock); - assertSame("Read the write", nnSock, cache.get(nnAddr)); - cache.put(nnSock); + cache.put(nnSock, null); + assertSame("Read the write", nnSock, cache.get(nnAddr).sock); + cache.put(nnSock, null); // Insert DN socks for (Socket dnSock : dnSockets) { - cache.put(dnSock); + cache.put(dnSock, null); } assertEquals("NN socket evicted", null, cache.get(nnAddr)); @@ -182,7 +182,7 @@ public class TestConnCache { // Lookup the DN socks for (Socket dnSock : dnSockets) { - assertEquals("Retrieve cached sockets", dnSock, cache.get(dnAddr)); + assertEquals("Retrieve cached sockets", dnSock, cache.get(dnAddr).sock); dnSock.close(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java index 5699c10171d..c8ab6e002fa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java @@ -113,7 +113,7 @@ public class TestDataTransferKeepalive { // Take it out of the cache - reading should // give an EOF. - Socket s = dfsClient.socketCache.get(dnAddr); + Socket s = dfsClient.socketCache.get(dnAddr).sock; assertNotNull(s); assertEquals(-1, NetUtils.getInputStream(s).read()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptedTransfer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptedTransfer.java new file mode 100644 index 00000000000..0d21a37ccd6 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptedTransfer.java @@ -0,0 +1,459 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileChecksum; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.GenericTestUtils.LogCapturer; +import org.junit.Test; +import org.mockito.Mockito; + +public class TestEncryptedTransfer { + + private static final Log LOG = LogFactory.getLog(TestEncryptedTransfer.class); + + private static final String PLAIN_TEXT = "this is very secret plain text"; + private static final Path TEST_PATH = new Path("/non-encrypted-file"); + + private static void setEncryptionConfigKeys(Configuration conf) { + conf.setBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY, true); + conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true); + } + + // Unset DFS_ENCRYPT_DATA_TRANSFER_KEY and DFS_DATA_ENCRYPTION_ALGORITHM_KEY + // on the client side to ensure that clients will detect this setting + // automatically from the NN. + private static FileSystem getFileSystem(Configuration conf) throws IOException { + Configuration localConf = new Configuration(conf); + localConf.setBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY, false); + localConf.unset(DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY); + return FileSystem.get(localConf); + } + + @Test + public void testEncryptedRead() throws IOException { + MiniDFSCluster cluster = null; + try { + Configuration conf = new Configuration(); + cluster = new MiniDFSCluster.Builder(conf).build(); + + FileSystem fs = getFileSystem(conf); + writeTestDataToFile(fs); + assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); + FileChecksum checksum = fs.getFileChecksum(TEST_PATH); + fs.close(); + cluster.shutdown(); + + setEncryptionConfigKeys(conf); + + cluster = new MiniDFSCluster.Builder(conf) + .manageDataDfsDirs(false) + .manageNameDfsDirs(false) + .format(false) + .startupOption(StartupOption.REGULAR) + .build(); + + fs = getFileSystem(conf); + assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); + assertEquals(checksum, fs.getFileChecksum(TEST_PATH)); + fs.close(); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + @Test + public void testEncryptedReadWithRC4() throws IOException { + MiniDFSCluster cluster = null; + try { + Configuration conf = new Configuration(); + cluster = new MiniDFSCluster.Builder(conf).build(); + + FileSystem fs = getFileSystem(conf); + writeTestDataToFile(fs); + assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); + FileChecksum checksum = fs.getFileChecksum(TEST_PATH); + fs.close(); + cluster.shutdown(); + + setEncryptionConfigKeys(conf); + // It'll use 3DES by default, but we set it to rc4 here. + conf.set(DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY, "rc4"); + + cluster = new MiniDFSCluster.Builder(conf) + .manageDataDfsDirs(false) + .manageNameDfsDirs(false) + .format(false) + .startupOption(StartupOption.REGULAR) + .build(); + + fs = getFileSystem(conf); + assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); + assertEquals(checksum, fs.getFileChecksum(TEST_PATH)); + fs.close(); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + @Test + public void testEncryptedReadAfterNameNodeRestart() throws IOException { + MiniDFSCluster cluster = null; + try { + Configuration conf = new Configuration(); + cluster = new MiniDFSCluster.Builder(conf).build(); + + FileSystem fs = getFileSystem(conf); + writeTestDataToFile(fs); + assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); + FileChecksum checksum = fs.getFileChecksum(TEST_PATH); + fs.close(); + cluster.shutdown(); + + setEncryptionConfigKeys(conf); + + cluster = new MiniDFSCluster.Builder(conf) + .manageDataDfsDirs(false) + .manageNameDfsDirs(false) + .format(false) + .startupOption(StartupOption.REGULAR) + .build(); + + fs = getFileSystem(conf); + assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); + assertEquals(checksum, fs.getFileChecksum(TEST_PATH)); + fs.close(); + + cluster.restartNameNode(); + fs = getFileSystem(conf); + assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); + assertEquals(checksum, fs.getFileChecksum(TEST_PATH)); + fs.close(); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + @Test + public void testClientThatDoesNotSupportEncryption() throws IOException { + MiniDFSCluster cluster = null; + try { + Configuration conf = new Configuration(); + cluster = new MiniDFSCluster.Builder(conf).build(); + + FileSystem fs = getFileSystem(conf); + writeTestDataToFile(fs); + assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); + fs.close(); + cluster.shutdown(); + + setEncryptionConfigKeys(conf); + + cluster = new MiniDFSCluster.Builder(conf) + .manageDataDfsDirs(false) + .manageNameDfsDirs(false) + .format(false) + .startupOption(StartupOption.REGULAR) + .build(); + + + fs = getFileSystem(conf); + DFSClient client = DFSClientAdapter.getDFSClient((DistributedFileSystem) fs); + DFSClient spyClient = Mockito.spy(client); + Mockito.doReturn(false).when(spyClient).shouldEncryptData(); + DFSClientAdapter.setDFSClient((DistributedFileSystem) fs, spyClient); + + LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs( + LogFactory.getLog(DataNode.class)); + try { + assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); + fail("Should not have been able to read without encryption enabled."); + } catch (IOException ioe) { + GenericTestUtils.assertExceptionContains("Could not obtain block:", + ioe); + } finally { + logs.stopCapturing(); + } + fs.close(); + + GenericTestUtils.assertMatches(logs.getOutput(), + "Failed to read expected encryption handshake from client at"); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + @Test + public void testLongLivedReadClientAfterRestart() throws IOException { + MiniDFSCluster cluster = null; + try { + Configuration conf = new Configuration(); + cluster = new MiniDFSCluster.Builder(conf).build(); + + FileSystem fs = getFileSystem(conf); + writeTestDataToFile(fs); + assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); + FileChecksum checksum = fs.getFileChecksum(TEST_PATH); + fs.close(); + cluster.shutdown(); + + setEncryptionConfigKeys(conf); + + cluster = new MiniDFSCluster.Builder(conf) + .manageDataDfsDirs(false) + .manageNameDfsDirs(false) + .format(false) + .startupOption(StartupOption.REGULAR) + .build(); + + fs = getFileSystem(conf); + assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); + assertEquals(checksum, fs.getFileChecksum(TEST_PATH)); + + // Restart the NN and DN, after which the client's encryption key will no + // longer be valid. + cluster.restartNameNode(); + assertTrue(cluster.restartDataNode(0)); + + assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); + assertEquals(checksum, fs.getFileChecksum(TEST_PATH)); + + fs.close(); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + @Test + public void testLongLivedWriteClientAfterRestart() throws IOException { + MiniDFSCluster cluster = null; + try { + Configuration conf = new Configuration(); + setEncryptionConfigKeys(conf); + cluster = new MiniDFSCluster.Builder(conf).build(); + + FileSystem fs = getFileSystem(conf); + + writeTestDataToFile(fs); + assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); + + // Restart the NN and DN, after which the client's encryption key will no + // longer be valid. + cluster.restartNameNode(); + assertTrue(cluster.restartDataNodes()); + cluster.waitActive(); + + writeTestDataToFile(fs); + assertEquals(PLAIN_TEXT + PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); + + fs.close(); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + @Test + public void testLongLivedClient() throws IOException, InterruptedException { + MiniDFSCluster cluster = null; + try { + Configuration conf = new Configuration(); + cluster = new MiniDFSCluster.Builder(conf).build(); + + FileSystem fs = getFileSystem(conf); + writeTestDataToFile(fs); + assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); + FileChecksum checksum = fs.getFileChecksum(TEST_PATH); + fs.close(); + cluster.shutdown(); + + setEncryptionConfigKeys(conf); + + cluster = new MiniDFSCluster.Builder(conf) + .manageDataDfsDirs(false) + .manageNameDfsDirs(false) + .format(false) + .startupOption(StartupOption.REGULAR) + .build(); + + BlockTokenSecretManager btsm = cluster.getNamesystem().getBlockManager() + .getBlockTokenSecretManager(); + btsm.setKeyUpdateIntervalForTesting(2 * 1000); + btsm.setTokenLifetime(2 * 1000); + btsm.clearAllKeysForTesting(); + + fs = getFileSystem(conf); + assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); + assertEquals(checksum, fs.getFileChecksum(TEST_PATH)); + + // Sleep for 15 seconds, after which the encryption key will no longer be + // valid. It needs to be a few multiples of the block token lifetime, + // since several block tokens are valid at any given time (the current + // and the last two, by default.) + LOG.info("Sleeping so that encryption keys expire..."); + Thread.sleep(15 * 1000); + LOG.info("Done sleeping."); + + assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); + assertEquals(checksum, fs.getFileChecksum(TEST_PATH)); + + fs.close(); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + @Test + public void testEncryptedWriteWithOneDn() throws IOException { + testEncryptedWrite(1); + } + + @Test + public void testEncryptedWriteWithTwoDns() throws IOException { + testEncryptedWrite(2); + } + + @Test + public void testEncryptedWriteWithMultipleDns() throws IOException { + testEncryptedWrite(10); + } + + private void testEncryptedWrite(int numDns) throws IOException { + MiniDFSCluster cluster = null; + try { + Configuration conf = new Configuration(); + setEncryptionConfigKeys(conf); + + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDns).build(); + + FileSystem fs = getFileSystem(conf); + writeTestDataToFile(fs); + assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); + fs.close(); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + @Test + public void testEncryptedAppend() throws IOException { + MiniDFSCluster cluster = null; + try { + Configuration conf = new Configuration(); + setEncryptionConfigKeys(conf); + + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); + + FileSystem fs = getFileSystem(conf); + + writeTestDataToFile(fs); + assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); + + writeTestDataToFile(fs); + assertEquals(PLAIN_TEXT + PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); + + fs.close(); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + @Test + public void testEncryptedAppendRequiringBlockTransfer() throws IOException { + MiniDFSCluster cluster = null; + try { + Configuration conf = new Configuration(); + setEncryptionConfigKeys(conf); + + // start up 4 DNs + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4).build(); + + FileSystem fs = getFileSystem(conf); + + // Create a file with replication 3, so its block is on 3 / 4 DNs. + writeTestDataToFile(fs); + assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); + + // Shut down one of the DNs holding a block replica. + FSDataInputStream in = fs.open(TEST_PATH); + List locatedBlocks = DFSTestUtil.getAllBlocks(in); + in.close(); + assertEquals(1, locatedBlocks.size()); + assertEquals(3, locatedBlocks.get(0).getLocations().length); + DataNode dn = cluster.getDataNode(locatedBlocks.get(0).getLocations()[0].getIpcPort()); + dn.shutdown(); + + // Reopen the file for append, which will need to add another DN to the + // pipeline and in doing so trigger a block transfer. + writeTestDataToFile(fs); + assertEquals(PLAIN_TEXT + PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); + + fs.close(); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + private static void writeTestDataToFile(FileSystem fs) throws IOException { + OutputStream out = null; + if (!fs.exists(TEST_PATH)) { + out = fs.create(TEST_PATH); + } else { + out = fs.append(TEST_PATH); + } + out.write(PLAIN_TEXT.getBytes()); + out.close(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java index e4731c9d821..4b9f2f4faaa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java @@ -162,7 +162,7 @@ public class TestBlockToken { public void testWritable() throws Exception { TestWritable.testWritable(new BlockTokenIdentifier()); BlockTokenSecretManager sm = new BlockTokenSecretManager( - blockKeyUpdateInterval, blockTokenLifetime, 0); + blockKeyUpdateInterval, blockTokenLifetime, 0, "fake-pool", null); TestWritable.testWritable(generateTokenId(sm, block1, EnumSet.allOf(BlockTokenSecretManager.AccessMode.class))); TestWritable.testWritable(generateTokenId(sm, block2, @@ -201,9 +201,9 @@ public class TestBlockToken { @Test public void testBlockTokenSecretManager() throws Exception { BlockTokenSecretManager masterHandler = new BlockTokenSecretManager( - blockKeyUpdateInterval, blockTokenLifetime, 0); + blockKeyUpdateInterval, blockTokenLifetime, 0, "fake-pool", null); BlockTokenSecretManager slaveHandler = new BlockTokenSecretManager( - blockKeyUpdateInterval, blockTokenLifetime); + blockKeyUpdateInterval, blockTokenLifetime, "fake-pool", null); ExportedBlockKeys keys = masterHandler.exportKeys(); slaveHandler.addKeys(keys); tokenGenerationAndVerification(masterHandler, slaveHandler); @@ -238,7 +238,7 @@ public class TestBlockToken { @Test public void testBlockTokenRpc() throws Exception { BlockTokenSecretManager sm = new BlockTokenSecretManager( - blockKeyUpdateInterval, blockTokenLifetime, 0); + blockKeyUpdateInterval, blockTokenLifetime, 0, "fake-pool", null); Token token = sm.generateToken(block3, EnumSet.allOf(BlockTokenSecretManager.AccessMode.class)); @@ -273,7 +273,7 @@ public class TestBlockToken { public void testBlockTokenRpcLeak() throws Exception { Assume.assumeTrue(FD_DIR.exists()); BlockTokenSecretManager sm = new BlockTokenSecretManager( - blockKeyUpdateInterval, blockTokenLifetime, 0); + blockKeyUpdateInterval, blockTokenLifetime, 0, "fake-pool", null); Token token = sm.generateToken(block3, EnumSet.allOf(BlockTokenSecretManager.AccessMode.class)); @@ -342,9 +342,9 @@ public class TestBlockToken { for (int i = 0; i < 10; i++) { String bpid = Integer.toString(i); BlockTokenSecretManager masterHandler = new BlockTokenSecretManager( - blockKeyUpdateInterval, blockTokenLifetime, 0); + blockKeyUpdateInterval, blockTokenLifetime, 0, "fake-pool", null); BlockTokenSecretManager slaveHandler = new BlockTokenSecretManager( - blockKeyUpdateInterval, blockTokenLifetime); + blockKeyUpdateInterval, blockTokenLifetime, "fake-pool", null); bpMgr.addBlockPool(bpid, slaveHandler); ExportedBlockKeys keys = masterHandler.exportKeys(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java index 9ce8bf448c3..49294089796 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java @@ -396,7 +396,10 @@ public class TestBalancer { * then a new empty node is added to the cluster*/ @Test public void testBalancer0() throws Exception { - Configuration conf = new HdfsConfiguration(); + testBalancer0Internal(new HdfsConfiguration()); + } + + void testBalancer0Internal(Configuration conf) throws Exception { initConf(conf); oneNodeTest(conf); twoNodeTest(conf); @@ -405,7 +408,10 @@ public class TestBalancer { /** Test unevenly distributed cluster */ @Test public void testBalancer1() throws Exception { - Configuration conf = new HdfsConfiguration(); + testBalancer1Internal(new HdfsConfiguration()); + } + + void testBalancer1Internal(Configuration conf) throws Exception { initConf(conf); testUnevenDistribution(conf, new long[] {50*CAPACITY/100, 10*CAPACITY/100}, @@ -415,7 +421,10 @@ public class TestBalancer { @Test public void testBalancer2() throws Exception { - Configuration conf = new HdfsConfiguration(); + testBalancer2Internal(new HdfsConfiguration()); + } + + void testBalancer2Internal(Configuration conf) throws Exception { initConf(conf); testBalancerDefaultConstructor(conf, new long[] { CAPACITY, CAPACITY }, new String[] { RACK0, RACK1 }, CAPACITY, RACK2); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithEncryptedTransfer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithEncryptedTransfer.java new file mode 100644 index 00000000000..30c7ce9c810 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithEncryptedTransfer.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.balancer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.junit.Before; +import org.junit.Test; + +public class TestBalancerWithEncryptedTransfer { + + private Configuration conf = new HdfsConfiguration(); + + @Before + public void setUpConf() { + conf.setBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY, true); + conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true); + } + + @Test + public void testEncryptedBalancer0() throws Exception { + new TestBalancer().testBalancer0Internal(conf); + } + + @Test + public void testEncryptedBalancer1() throws Exception { + new TestBalancer().testBalancer1Internal(conf); + } + + @Test + public void testEncryptedBalancer2() throws Exception { + new TestBalancer().testBalancer2Internal(conf); + } + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java index 687e813e25c..c7dbf200b13 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java @@ -146,7 +146,7 @@ public class TestBlockTokenWithDFS { "test-blockpoolid", block.getBlockId()); blockReader = BlockReaderFactory.newBlockReader( conf, s, file, block, - lblock.getBlockToken(), 0, -1); + lblock.getBlockToken(), 0, -1, null); } catch (IOException ex) { if (ex instanceof InvalidBlockTokenException) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java index 49ef37049d7..ac1cf034338 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java @@ -281,7 +281,7 @@ public class TestDataNodeVolumeFailure { "test-blockpoolid", block.getBlockId()); BlockReaderFactory.newBlockReader(conf, s, file, block, lblock - .getBlockToken(), 0, -1); + .getBlockToken(), 0, -1, null); // nothing - if it fails - it will throw and exception }