diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 4abf234de5a..6244ee40717 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -1772,6 +1772,11 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, } } + @VisibleForTesting + public DataEncryptionKey getEncryptionKey() { + return encryptionKey; + } + /** * Get the checksum of the whole file of a range of the file. Note that the * range always starts from the beginning of the file. diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index bdd20c4c32f..2a7f8c0133f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -124,6 +124,89 @@ import javax.annotation.Nonnull; class DataStreamer extends Daemon { static final Logger LOG = LoggerFactory.getLogger(DataStreamer.class); + private class RefetchEncryptionKeyPolicy { + private int fetchEncryptionKeyTimes = 0; + private InvalidEncryptionKeyException lastException; + private final DatanodeInfo src; + + RefetchEncryptionKeyPolicy(DatanodeInfo src) { + this.src = src; + } + boolean continueRetryingOrThrow() throws InvalidEncryptionKeyException { + if (fetchEncryptionKeyTimes >= 2) { + // hit the same exception twice connecting to the node, so + // throw the exception and exclude the node. + throw lastException; + } + // Don't exclude this node just yet. + // Try again with a new encryption key. + LOG.info("Will fetch a new encryption key and retry, " + + "encryption key was invalid when connecting to " + + this.src + ": ", lastException); + // The encryption key used is invalid. + dfsClient.clearDataEncryptionKey(); + return true; + } + + /** + * Record a connection exception. + * @param e + * @throws InvalidEncryptionKeyException + */ + void recordFailure(final InvalidEncryptionKeyException e) + throws InvalidEncryptionKeyException { + fetchEncryptionKeyTimes++; + lastException = e; + } + } + + private class StreamerStreams implements java.io.Closeable { + private Socket sock = null; + private DataOutputStream out = null; + private DataInputStream in = null; + + StreamerStreams(final DatanodeInfo src, + final long writeTimeout, final long readTimeout, + final Token blockToken) + throws IOException { + sock = createSocketForPipeline(src, 2, dfsClient); + + OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout); + InputStream unbufIn = NetUtils.getInputStream(sock, readTimeout); + IOStreamPair saslStreams = dfsClient.saslClient + .socketSend(sock, unbufOut, unbufIn, dfsClient, blockToken, src); + unbufOut = saslStreams.out; + unbufIn = saslStreams.in; + out = new DataOutputStream(new BufferedOutputStream(unbufOut, + DFSUtilClient.getSmallBufferSize(dfsClient.getConfiguration()))); + in = new DataInputStream(unbufIn); + } + + void sendTransferBlock(final DatanodeInfo[] targets, + final StorageType[] targetStorageTypes, + final Token blockToken) throws IOException { + //send the TRANSFER_BLOCK request + new Sender(out) + .transferBlock(block, blockToken, dfsClient.clientName, targets, + targetStorageTypes); + out.flush(); + //ack + BlockOpResponseProto transferResponse = BlockOpResponseProto + .parseFrom(PBHelperClient.vintPrefixed(in)); + if (SUCCESS != transferResponse.getStatus()) { + throw new IOException("Failed to add a datanode. Response status: " + + transferResponse.getStatus()); + } + } + + @Override + public void close() throws IOException { + IOUtils.closeStream(in); + IOUtils.closeStream(out); + IOUtils.closeSocket(sock); + } + } + /** * Create a socket for a write pipeline * @@ -1237,50 +1320,39 @@ class DataStreamer extends Daemon { new IOException("Failed to add a node"); } + private long computeTransferWriteTimeout() { + return dfsClient.getDatanodeWriteTimeout(2); + } + private long computeTransferReadTimeout() { + // transfer timeout multiplier based on the transfer size + // One per 200 packets = 12.8MB. Minimum is 2. + int multi = 2 + + (int) (bytesSent / dfsClient.getConf().getWritePacketSize()) / 200; + return dfsClient.getDatanodeReadTimeout(multi); + } + private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets, final StorageType[] targetStorageTypes, final Token blockToken) throws IOException { //transfer replica to the new datanode - Socket sock = null; - DataOutputStream out = null; - DataInputStream in = null; - try { - sock = createSocketForPipeline(src, 2, dfsClient); - final long writeTimeout = dfsClient.getDatanodeWriteTimeout(2); + RefetchEncryptionKeyPolicy policy = new RefetchEncryptionKeyPolicy(src); + do { + StreamerStreams streams = null; + try { + final long writeTimeout = computeTransferWriteTimeout(); + final long readTimeout = computeTransferReadTimeout(); - // transfer timeout multiplier based on the transfer size - // One per 200 packets = 12.8MB. Minimum is 2. - int multi = 2 + (int)(bytesSent /dfsClient.getConf().getWritePacketSize()) - / 200; - final long readTimeout = dfsClient.getDatanodeReadTimeout(multi); - - OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout); - InputStream unbufIn = NetUtils.getInputStream(sock, readTimeout); - IOStreamPair saslStreams = dfsClient.saslClient.socketSend(sock, - unbufOut, unbufIn, dfsClient, blockToken, src); - unbufOut = saslStreams.out; - unbufIn = saslStreams.in; - out = new DataOutputStream(new BufferedOutputStream(unbufOut, - DFSUtilClient.getSmallBufferSize(dfsClient.getConfiguration()))); - in = new DataInputStream(unbufIn); - - //send the TRANSFER_BLOCK request - new Sender(out).transferBlock(block, blockToken, dfsClient.clientName, - targets, targetStorageTypes); - out.flush(); - - //ack - BlockOpResponseProto response = - BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(in)); - if (SUCCESS != response.getStatus()) { - throw new IOException("Failed to add a datanode"); + streams = new StreamerStreams(src, writeTimeout, readTimeout, + blockToken); + streams.sendTransferBlock(targets, targetStorageTypes, blockToken); + return; + } catch (InvalidEncryptionKeyException e) { + policy.recordFailure(e); + } finally { + IOUtils.closeStream(streams); } - } finally { - IOUtils.closeStream(in); - IOUtils.closeStream(out); - IOUtils.closeSocket(sock); - } + } while (policy.continueRetryingOrThrow()); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java index cdfe7ec54b1..3e315eec56f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java @@ -49,7 +49,8 @@ public class BlockPoolTokenSecretManager extends map.put(bpid, secretMgr); } - synchronized BlockTokenSecretManager get(String bpid) { + @VisibleForTesting + public synchronized BlockTokenSecretManager get(String bpid) { BlockTokenSecretManager secretMgr = map.get(bpid); if (secretMgr == null) { throw new IllegalArgumentException("Block pool " + bpid diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java index b103c1abc9a..c25f1b44893 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java @@ -436,6 +436,12 @@ public class BlockTokenSecretManager extends allKeys.clear(); } + @VisibleForTesting + public synchronized boolean hasKey(int keyId) { + BlockKey key = allKeys.get(keyId); + return key != null; + } + @VisibleForTesting public synchronized int getSerialNoForTesting() { return serialNo; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 4c38b895243..79742accbae 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -2708,6 +2708,11 @@ public class DataNode extends ReconfigurableBase return directoryScanner; } + @VisibleForTesting + public BlockPoolTokenSecretManager getBlockPoolTokenSecretManager() { + return blockPoolTokenSecretManager; + } + public static void secureMain(String args[], SecureResources resources) { int errorCode = 0; try { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptedTransfer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptedTransfer.java index 9c02e9f1a4e..e9a7c420dff 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptedTransfer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptedTransfer.java @@ -18,24 +18,31 @@ package org.apache.hadoop.hdfs; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Mockito.times; import java.io.IOException; import java.io.OutputStream; import java.net.InetAddress; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.concurrent.TimeoutException; +import com.google.common.base.Supplier; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil; @@ -45,9 +52,14 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils.LogCapturer; +import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.log4j.Level; import org.apache.log4j.LogManager; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.Timeout; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; @@ -59,6 +71,9 @@ public class TestEncryptedTransfer { LogManager.getLogger(SaslDataTransferServer.class).setLevel(Level.DEBUG); LogManager.getLogger(DataTransferSaslUtil.class).setLevel(Level.DEBUG); } + + @Rule + public Timeout timeout = new Timeout(300000); @Parameters public static Collection data() { @@ -72,8 +87,12 @@ public class TestEncryptedTransfer { private static final String PLAIN_TEXT = "this is very secret plain text"; private static final Path TEST_PATH = new Path("/non-encrypted-file"); + + private MiniDFSCluster cluster = null; + private Configuration conf = null; + private FileSystem fs = null; - private void setEncryptionConfigKeys(Configuration conf) { + private void setEncryptionConfigKeys() { conf.setBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY, true); conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true); if (resolverClazz != null){ @@ -96,505 +115,360 @@ public class TestEncryptedTransfer { this.resolverClazz = resolverClazz; } - @Test - public void testEncryptedRead() throws IOException { - MiniDFSCluster cluster = null; - try { - Configuration conf = new Configuration(); - cluster = new MiniDFSCluster.Builder(conf).build(); - - FileSystem fs = getFileSystem(conf); - writeTestDataToFile(fs); - assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); - FileChecksum checksum = fs.getFileChecksum(TEST_PATH); + @Before + public void setup() throws IOException { + conf = new Configuration(); + } + + @After + public void teardown() throws IOException { + if (fs != null) { fs.close(); + } + if (cluster != null) { cluster.shutdown(); - - setEncryptionConfigKeys(conf); - - cluster = new MiniDFSCluster.Builder(conf) - .manageDataDfsDirs(false) - .manageNameDfsDirs(false) - .format(false) - .startupOption(StartupOption.REGULAR) - .build(); - - fs = getFileSystem(conf); - LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs( - LogFactory.getLog(SaslDataTransferServer.class)); - LogCapturer logs1 = GenericTestUtils.LogCapturer.captureLogs( - LogFactory.getLog(DataTransferSaslUtil.class)); - try { - assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); - assertEquals(checksum, fs.getFileChecksum(TEST_PATH)); - } finally { - logs.stopCapturing(); - logs1.stopCapturing(); - } - - fs.close(); - - if (resolverClazz == null) { - // Test client and server negotiate cipher option - GenericTestUtils.assertDoesNotMatch(logs.getOutput(), - "Server using cipher suite"); - // Check the IOStreamPair - GenericTestUtils.assertDoesNotMatch(logs1.getOutput(), - "Creating IOStreamPair of CryptoInputStream and CryptoOutputStream."); - } - } finally { - if (cluster != null) { - cluster.shutdown(); - } } } - - @Test - public void testEncryptedReadWithRC4() throws IOException { - MiniDFSCluster cluster = null; - try { - Configuration conf = new Configuration(); - cluster = new MiniDFSCluster.Builder(conf).build(); - - FileSystem fs = getFileSystem(conf); - writeTestDataToFile(fs); - assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); - FileChecksum checksum = fs.getFileChecksum(TEST_PATH); - fs.close(); - cluster.shutdown(); - - setEncryptionConfigKeys(conf); - // It'll use 3DES by default, but we set it to rc4 here. - conf.set(DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY, "rc4"); - - cluster = new MiniDFSCluster.Builder(conf) - .manageDataDfsDirs(false) - .manageNameDfsDirs(false) - .format(false) - .startupOption(StartupOption.REGULAR) - .build(); - - fs = getFileSystem(conf); - LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs( - LogFactory.getLog(SaslDataTransferServer.class)); - LogCapturer logs1 = GenericTestUtils.LogCapturer.captureLogs( - LogFactory.getLog(DataTransferSaslUtil.class)); - try { - assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); - assertEquals(checksum, fs.getFileChecksum(TEST_PATH)); - } finally { - logs.stopCapturing(); - logs1.stopCapturing(); - } - fs.close(); + private FileChecksum writeUnencryptedAndThenRestartEncryptedCluster() + throws IOException { + cluster = new MiniDFSCluster.Builder(conf).build(); - if (resolverClazz == null) { - // Test client and server negotiate cipher option - GenericTestUtils.assertDoesNotMatch(logs.getOutput(), - "Server using cipher suite"); - // Check the IOStreamPair - GenericTestUtils.assertDoesNotMatch(logs1.getOutput(), - "Creating IOStreamPair of CryptoInputStream and CryptoOutputStream."); - } - } finally { - if (cluster != null) { - cluster.shutdown(); - } - } + fs = getFileSystem(conf); + writeTestDataToFile(fs); + assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); + FileChecksum checksum = fs.getFileChecksum(TEST_PATH); + fs.close(); + cluster.shutdown(); + + setEncryptionConfigKeys(); + + cluster = new MiniDFSCluster.Builder(conf) + .manageDataDfsDirs(false) + .manageNameDfsDirs(false) + .format(false) + .startupOption(StartupOption.REGULAR) + .build(); + + fs = getFileSystem(conf); + return checksum; } - - @Test - public void testEncryptedReadWithAES() throws IOException { - MiniDFSCluster cluster = null; + + private void testEncryptedRead(String algorithm, String cipherSuite, + boolean matchLog, boolean readAfterRestart) + throws IOException { + // set encryption algorithm and cipher suites, but don't enable transfer + // encryption yet. + conf.set(DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY, algorithm); + conf.set(HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, + cipherSuite); + + FileChecksum checksum = writeUnencryptedAndThenRestartEncryptedCluster(); + + LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs( + LogFactory.getLog(SaslDataTransferServer.class)); + LogCapturer logs1 = GenericTestUtils.LogCapturer.captureLogs( + LogFactory.getLog(DataTransferSaslUtil.class)); try { - Configuration conf = new Configuration(); - conf.set(HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, - "AES/CTR/NoPadding"); - cluster = new MiniDFSCluster.Builder(conf).build(); - - FileSystem fs = getFileSystem(conf); - writeTestDataToFile(fs); assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); - FileChecksum checksum = fs.getFileChecksum(TEST_PATH); - fs.close(); - cluster.shutdown(); + assertEquals(checksum, fs.getFileChecksum(TEST_PATH)); + } finally { + logs.stopCapturing(); + logs1.stopCapturing(); + } - setEncryptionConfigKeys(conf); - - cluster = new MiniDFSCluster.Builder(conf) - .manageDataDfsDirs(false) - .manageNameDfsDirs(false) - .format(false) - .startupOption(StartupOption.REGULAR) - .build(); - - fs = getFileSystem(conf); - LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs( - LogFactory.getLog(SaslDataTransferServer.class)); - LogCapturer logs1 = GenericTestUtils.LogCapturer.captureLogs( - LogFactory.getLog(DataTransferSaslUtil.class)); - try { - assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); - assertEquals(checksum, fs.getFileChecksum(TEST_PATH)); - } finally { - logs.stopCapturing(); - logs1.stopCapturing(); - } - - fs.close(); - - if (resolverClazz == null) { + if (resolverClazz == null) { + if (matchLog) { // Test client and server negotiate cipher option - GenericTestUtils.assertMatches(logs.getOutput(), - "Server using cipher suite"); + GenericTestUtils + .assertMatches(logs.getOutput(), "Server using cipher suite"); // Check the IOStreamPair GenericTestUtils.assertMatches(logs1.getOutput(), "Creating IOStreamPair of CryptoInputStream and CryptoOutputStream."); - } - } finally { - if (cluster != null) { - cluster.shutdown(); + } else { + // Test client and server negotiate cipher option + GenericTestUtils + .assertDoesNotMatch(logs.getOutput(), "Server using cipher suite"); + // Check the IOStreamPair + GenericTestUtils.assertDoesNotMatch(logs1.getOutput(), + "Creating IOStreamPair of CryptoInputStream and CryptoOutputStream."); } } + + if (readAfterRestart) { + cluster.restartNameNode(); + fs = getFileSystem(conf); + assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); + assertEquals(checksum, fs.getFileChecksum(TEST_PATH)); + } + } + + @Test + public void testEncryptedReadDefaultAlgorithmCipherSuite() + throws IOException { + testEncryptedRead("", "", false, false); + } + + @Test + public void testEncryptedReadWithRC4() throws IOException { + testEncryptedRead("rc4", "", false, false); + } + + @Test + public void testEncryptedReadWithAES() throws IOException { + testEncryptedRead("", "AES/CTR/NoPadding", true, false); } @Test public void testEncryptedReadAfterNameNodeRestart() throws IOException { - MiniDFSCluster cluster = null; - try { - Configuration conf = new Configuration(); - cluster = new MiniDFSCluster.Builder(conf).build(); - - FileSystem fs = getFileSystem(conf); - writeTestDataToFile(fs); - assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); - FileChecksum checksum = fs.getFileChecksum(TEST_PATH); - fs.close(); - cluster.shutdown(); - - setEncryptionConfigKeys(conf); - - cluster = new MiniDFSCluster.Builder(conf) - .manageDataDfsDirs(false) - .manageNameDfsDirs(false) - .format(false) - .startupOption(StartupOption.REGULAR) - .build(); - - fs = getFileSystem(conf); - assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); - assertEquals(checksum, fs.getFileChecksum(TEST_PATH)); - fs.close(); - - cluster.restartNameNode(); - fs = getFileSystem(conf); - assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); - assertEquals(checksum, fs.getFileChecksum(TEST_PATH)); - fs.close(); - } finally { - if (cluster != null) { - cluster.shutdown(); - } - } + testEncryptedRead("", "", false, true); } - + @Test public void testClientThatDoesNotSupportEncryption() throws IOException { - MiniDFSCluster cluster = null; + // Set short retry timeouts so this test runs faster + conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10); + + writeUnencryptedAndThenRestartEncryptedCluster(); + + DFSClient client = DFSClientAdapter.getDFSClient((DistributedFileSystem)fs); + DFSClient spyClient = Mockito.spy(client); + Mockito.doReturn(false).when(spyClient).shouldEncryptData(); + DFSClientAdapter.setDFSClient((DistributedFileSystem) fs, spyClient); + + LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs( + LogFactory.getLog(DataNode.class)); try { - Configuration conf = new Configuration(); - // Set short retry timeouts so this test runs faster - conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10); - cluster = new MiniDFSCluster.Builder(conf).build(); - - FileSystem fs = getFileSystem(conf); - writeTestDataToFile(fs); assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); - fs.close(); - cluster.shutdown(); - - setEncryptionConfigKeys(conf); - - cluster = new MiniDFSCluster.Builder(conf) - .manageDataDfsDirs(false) - .manageNameDfsDirs(false) - .format(false) - .startupOption(StartupOption.REGULAR) - .build(); - - - fs = getFileSystem(conf); - DFSClient client = DFSClientAdapter.getDFSClient((DistributedFileSystem) fs); - DFSClient spyClient = Mockito.spy(client); - Mockito.doReturn(false).when(spyClient).shouldEncryptData(); - DFSClientAdapter.setDFSClient((DistributedFileSystem) fs, spyClient); - - LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs( - LogFactory.getLog(DataNode.class)); - try { - assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); - if (resolverClazz != null && !resolverClazz.endsWith("TestTrustedChannelResolver")){ - fail("Should not have been able to read without encryption enabled."); - } - } catch (IOException ioe) { - GenericTestUtils.assertExceptionContains("Could not obtain block:", - ioe); - } finally { - logs.stopCapturing(); - } - fs.close(); - - if (resolverClazz == null) { - GenericTestUtils.assertMatches(logs.getOutput(), - "Failed to read expected encryption handshake from client at"); + if (resolverClazz != null && + !resolverClazz.endsWith("TestTrustedChannelResolver")){ + fail("Should not have been able to read without encryption enabled."); } + } catch (IOException ioe) { + GenericTestUtils.assertExceptionContains("Could not obtain block:", + ioe); } finally { - if (cluster != null) { - cluster.shutdown(); - } + logs.stopCapturing(); + } + + if (resolverClazz == null) { + GenericTestUtils.assertMatches(logs.getOutput(), + "Failed to read expected encryption handshake from client at"); } } - + @Test public void testLongLivedReadClientAfterRestart() throws IOException { - MiniDFSCluster cluster = null; - try { - Configuration conf = new Configuration(); - cluster = new MiniDFSCluster.Builder(conf).build(); - - FileSystem fs = getFileSystem(conf); - writeTestDataToFile(fs); - assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); - FileChecksum checksum = fs.getFileChecksum(TEST_PATH); - fs.close(); - cluster.shutdown(); - - setEncryptionConfigKeys(conf); - - cluster = new MiniDFSCluster.Builder(conf) - .manageDataDfsDirs(false) - .manageNameDfsDirs(false) - .format(false) - .startupOption(StartupOption.REGULAR) - .build(); - - fs = getFileSystem(conf); - assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); - assertEquals(checksum, fs.getFileChecksum(TEST_PATH)); - - // Restart the NN and DN, after which the client's encryption key will no - // longer be valid. - cluster.restartNameNode(); - assertTrue(cluster.restartDataNode(0)); - - assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); - assertEquals(checksum, fs.getFileChecksum(TEST_PATH)); - - fs.close(); - } finally { - if (cluster != null) { - cluster.shutdown(); - } - } + FileChecksum checksum = writeUnencryptedAndThenRestartEncryptedCluster(); + + assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); + assertEquals(checksum, fs.getFileChecksum(TEST_PATH)); + + // Restart the NN and DN, after which the client's encryption key will no + // longer be valid. + cluster.restartNameNode(); + assertTrue(cluster.restartDataNode(0)); + + assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); + assertEquals(checksum, fs.getFileChecksum(TEST_PATH)); } - + @Test public void testLongLivedWriteClientAfterRestart() throws IOException { - MiniDFSCluster cluster = null; - try { - Configuration conf = new Configuration(); - setEncryptionConfigKeys(conf); - cluster = new MiniDFSCluster.Builder(conf).build(); - - FileSystem fs = getFileSystem(conf); - - writeTestDataToFile(fs); - assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); - - // Restart the NN and DN, after which the client's encryption key will no - // longer be valid. - cluster.restartNameNode(); - assertTrue(cluster.restartDataNodes()); - cluster.waitActive(); - - writeTestDataToFile(fs); - assertEquals(PLAIN_TEXT + PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); - - fs.close(); - } finally { - if (cluster != null) { - cluster.shutdown(); - } - } + setEncryptionConfigKeys(); + cluster = new MiniDFSCluster.Builder(conf).build(); + + fs = getFileSystem(conf); + + writeTestDataToFile(fs); + assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); + + // Restart the NN and DN, after which the client's encryption key will no + // longer be valid. + cluster.restartNameNode(); + assertTrue(cluster.restartDataNodes()); + cluster.waitActive(); + + writeTestDataToFile(fs); + assertEquals(PLAIN_TEXT + PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); } @Test public void testLongLivedClient() throws IOException, InterruptedException { - MiniDFSCluster cluster = null; - try { - Configuration conf = new Configuration(); - cluster = new MiniDFSCluster.Builder(conf).build(); - - FileSystem fs = getFileSystem(conf); - writeTestDataToFile(fs); - assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); - FileChecksum checksum = fs.getFileChecksum(TEST_PATH); - fs.close(); - cluster.shutdown(); - - setEncryptionConfigKeys(conf); - - cluster = new MiniDFSCluster.Builder(conf) - .manageDataDfsDirs(false) - .manageNameDfsDirs(false) - .format(false) - .startupOption(StartupOption.REGULAR) - .build(); - - BlockTokenSecretManager btsm = cluster.getNamesystem().getBlockManager() - .getBlockTokenSecretManager(); - btsm.setKeyUpdateIntervalForTesting(2 * 1000); - btsm.setTokenLifetime(2 * 1000); - btsm.clearAllKeysForTesting(); - - fs = getFileSystem(conf); - assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); - assertEquals(checksum, fs.getFileChecksum(TEST_PATH)); - - // Sleep for 15 seconds, after which the encryption key will no longer be - // valid. It needs to be a few multiples of the block token lifetime, - // since several block tokens are valid at any given time (the current - // and the last two, by default.) - LOG.info("Sleeping so that encryption keys expire..."); - Thread.sleep(15 * 1000); - LOG.info("Done sleeping."); - - assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); - assertEquals(checksum, fs.getFileChecksum(TEST_PATH)); - - fs.close(); - } finally { - if (cluster != null) { - cluster.shutdown(); - } - } + FileChecksum checksum = writeUnencryptedAndThenRestartEncryptedCluster(); + + BlockTokenSecretManager btsm = cluster.getNamesystem().getBlockManager() + .getBlockTokenSecretManager(); + btsm.setKeyUpdateIntervalForTesting(2 * 1000); + btsm.setTokenLifetime(2 * 1000); + btsm.clearAllKeysForTesting(); + + assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); + assertEquals(checksum, fs.getFileChecksum(TEST_PATH)); + + // Sleep for 15 seconds, after which the encryption key will no longer be + // valid. It needs to be a few multiples of the block token lifetime, + // since several block tokens are valid at any given time (the current + // and the last two, by default.) + LOG.info("Sleeping so that encryption keys expire..."); + Thread.sleep(15 * 1000); + LOG.info("Done sleeping."); + + assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); + assertEquals(checksum, fs.getFileChecksum(TEST_PATH)); } - + + @Test + public void testLongLivedClientPipelineRecovery() + throws IOException, InterruptedException, TimeoutException { + if (resolverClazz != null) { + // TestTrustedChannelResolver does not use encryption keys. + return; + } + // use 4 datanodes to make sure that after 1 data node is stopped, + // client only retries establishing pipeline with the 4th node. + int numDataNodes = 4; + // do not consider load factor when selecting a data node + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, + false); + setEncryptionConfigKeys(); + + cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(numDataNodes) + .build(); + + fs = getFileSystem(conf); + DFSClient client = DFSClientAdapter.getDFSClient((DistributedFileSystem)fs); + DFSClient spyClient = Mockito.spy(client); + DFSClientAdapter.setDFSClient((DistributedFileSystem) fs, spyClient); + writeTestDataToFile(fs); + + BlockTokenSecretManager btsm = cluster.getNamesystem().getBlockManager() + .getBlockTokenSecretManager(); + // Reduce key update interval and token life for testing. + btsm.setKeyUpdateIntervalForTesting(2 * 1000); + btsm.setTokenLifetime(2 * 1000); + btsm.clearAllKeysForTesting(); + + // Wait until the encryption key becomes invalid. + LOG.info("Wait until encryption keys become invalid..."); + + final DataEncryptionKey encryptionKey = spyClient.getEncryptionKey(); + List dataNodes = cluster.getDataNodes(); + for (final DataNode dn: dataNodes) { + GenericTestUtils.waitFor( + new Supplier() { + @Override + public Boolean get() { + return !dn.getBlockPoolTokenSecretManager(). + get(encryptionKey.blockPoolId) + .hasKey(encryptionKey.keyId); + } + }, 100, 30*1000 + ); + } + LOG.info("The encryption key is invalid on all nodes now."); + try(FSDataOutputStream out = fs.append(TEST_PATH)) { + DFSOutputStream dfstream = (DFSOutputStream) out.getWrappedStream(); + // shut down the first datanode in the pipeline. + DatanodeInfo[] targets = dfstream.getPipeline(); + cluster.stopDataNode(targets[0].getXferAddr()); + // write data to induce pipeline recovery + out.write(PLAIN_TEXT.getBytes()); + out.hflush(); + assertFalse("The first datanode in the pipeline was not replaced.", + Arrays.asList(dfstream.getPipeline()).contains(targets[0])); + } + // verify that InvalidEncryptionKeyException is handled properly + Mockito.verify(spyClient, times(1)).clearDataEncryptionKey(); + } + @Test public void testEncryptedWriteWithOneDn() throws IOException { testEncryptedWrite(1); } - + @Test public void testEncryptedWriteWithTwoDns() throws IOException { testEncryptedWrite(2); } - + @Test public void testEncryptedWriteWithMultipleDns() throws IOException { testEncryptedWrite(10); } - + private void testEncryptedWrite(int numDns) throws IOException { - MiniDFSCluster cluster = null; + setEncryptionConfigKeys(); + + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDns).build(); + + fs = getFileSystem(conf); + + LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs( + LogFactory.getLog(SaslDataTransferServer.class)); + LogCapturer logs1 = GenericTestUtils.LogCapturer.captureLogs( + LogFactory.getLog(DataTransferSaslUtil.class)); try { - Configuration conf = new Configuration(); - setEncryptionConfigKeys(conf); - - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDns).build(); - - FileSystem fs = getFileSystem(conf); - - LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs( - LogFactory.getLog(SaslDataTransferServer.class)); - LogCapturer logs1 = GenericTestUtils.LogCapturer.captureLogs( - LogFactory.getLog(DataTransferSaslUtil.class)); - try { - writeTestDataToFile(fs); - } finally { - logs.stopCapturing(); - logs1.stopCapturing(); - } - assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); - fs.close(); - - if (resolverClazz == null) { - // Test client and server negotiate cipher option - GenericTestUtils.assertDoesNotMatch(logs.getOutput(), - "Server using cipher suite"); - // Check the IOStreamPair - GenericTestUtils.assertDoesNotMatch(logs1.getOutput(), - "Creating IOStreamPair of CryptoInputStream and CryptoOutputStream."); - } + writeTestDataToFile(fs); } finally { - if (cluster != null) { - cluster.shutdown(); - } + logs.stopCapturing(); + logs1.stopCapturing(); + } + assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); + + if (resolverClazz == null) { + // Test client and server negotiate cipher option + GenericTestUtils.assertDoesNotMatch(logs.getOutput(), + "Server using cipher suite"); + // Check the IOStreamPair + GenericTestUtils.assertDoesNotMatch(logs1.getOutput(), + "Creating IOStreamPair of CryptoInputStream and CryptoOutputStream."); } } - + @Test public void testEncryptedAppend() throws IOException { - MiniDFSCluster cluster = null; - try { - Configuration conf = new Configuration(); - setEncryptionConfigKeys(conf); - - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); - - FileSystem fs = getFileSystem(conf); - - writeTestDataToFile(fs); - assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); - - writeTestDataToFile(fs); - assertEquals(PLAIN_TEXT + PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); - - fs.close(); - } finally { - if (cluster != null) { - cluster.shutdown(); - } - } + setEncryptionConfigKeys(); + + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); + + fs = getFileSystem(conf); + + writeTestDataToFile(fs); + assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); + + writeTestDataToFile(fs); + assertEquals(PLAIN_TEXT + PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); } - + @Test public void testEncryptedAppendRequiringBlockTransfer() throws IOException { - MiniDFSCluster cluster = null; - try { - Configuration conf = new Configuration(); - setEncryptionConfigKeys(conf); - - // start up 4 DNs - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4).build(); - - FileSystem fs = getFileSystem(conf); - - // Create a file with replication 3, so its block is on 3 / 4 DNs. - writeTestDataToFile(fs); - assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); - - // Shut down one of the DNs holding a block replica. - FSDataInputStream in = fs.open(TEST_PATH); - List locatedBlocks = DFSTestUtil.getAllBlocks(in); - in.close(); - assertEquals(1, locatedBlocks.size()); - assertEquals(3, locatedBlocks.get(0).getLocations().length); - DataNode dn = cluster.getDataNode(locatedBlocks.get(0).getLocations()[0].getIpcPort()); - dn.shutdown(); - - // Reopen the file for append, which will need to add another DN to the - // pipeline and in doing so trigger a block transfer. - writeTestDataToFile(fs); - assertEquals(PLAIN_TEXT + PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); - - fs.close(); - } finally { - if (cluster != null) { - cluster.shutdown(); - } - } + setEncryptionConfigKeys(); + + // start up 4 DNs + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4).build(); + + fs = getFileSystem(conf); + + // Create a file with replication 3, so its block is on 3 / 4 DNs. + writeTestDataToFile(fs); + assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); + + // Shut down one of the DNs holding a block replica. + FSDataInputStream in = fs.open(TEST_PATH); + List locatedBlocks = DFSTestUtil.getAllBlocks(in); + in.close(); + assertEquals(1, locatedBlocks.size()); + assertEquals(3, locatedBlocks.get(0).getLocations().length); + DataNode dn = cluster.getDataNode( + locatedBlocks.get(0).getLocations()[0].getIpcPort()); + dn.shutdown(); + + // Reopen the file for append, which will need to add another DN to the + // pipeline and in doing so trigger a block transfer. + writeTestDataToFile(fs); + assertEquals(PLAIN_TEXT + PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); } private static void writeTestDataToFile(FileSystem fs) throws IOException {