HDFS-10609. Uncaught InvalidEncryptionKeyException during pipeline recovery may abort downstream applications. Contributed by Wei-Chiu Chuang.

(cherry picked from commit 3ae652f821)
(cherry picked from commit bde787db23)
This commit is contained in:
Wei-Chiu Chuang 2016-09-26 14:44:48 -07:00
parent 09964a1629
commit f0f3b6a66a
6 changed files with 440 additions and 477 deletions

View File

@ -1772,6 +1772,11 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
} }
} }
@VisibleForTesting
public DataEncryptionKey getEncryptionKey() {
return encryptionKey;
}
/** /**
* Get the checksum of the whole file of a range of the file. Note that the * Get the checksum of the whole file of a range of the file. Note that the
* range always starts from the beginning of the file. * range always starts from the beginning of the file.

View File

@ -124,6 +124,89 @@ import javax.annotation.Nonnull;
class DataStreamer extends Daemon { class DataStreamer extends Daemon {
static final Logger LOG = LoggerFactory.getLogger(DataStreamer.class); static final Logger LOG = LoggerFactory.getLogger(DataStreamer.class);
private class RefetchEncryptionKeyPolicy {
private int fetchEncryptionKeyTimes = 0;
private InvalidEncryptionKeyException lastException;
private final DatanodeInfo src;
RefetchEncryptionKeyPolicy(DatanodeInfo src) {
this.src = src;
}
boolean continueRetryingOrThrow() throws InvalidEncryptionKeyException {
if (fetchEncryptionKeyTimes >= 2) {
// hit the same exception twice connecting to the node, so
// throw the exception and exclude the node.
throw lastException;
}
// Don't exclude this node just yet.
// Try again with a new encryption key.
LOG.info("Will fetch a new encryption key and retry, "
+ "encryption key was invalid when connecting to "
+ this.src + ": ", lastException);
// The encryption key used is invalid.
dfsClient.clearDataEncryptionKey();
return true;
}
/**
* Record a connection exception.
* @param e
* @throws InvalidEncryptionKeyException
*/
void recordFailure(final InvalidEncryptionKeyException e)
throws InvalidEncryptionKeyException {
fetchEncryptionKeyTimes++;
lastException = e;
}
}
private class StreamerStreams implements java.io.Closeable {
private Socket sock = null;
private DataOutputStream out = null;
private DataInputStream in = null;
StreamerStreams(final DatanodeInfo src,
final long writeTimeout, final long readTimeout,
final Token<BlockTokenIdentifier> blockToken)
throws IOException {
sock = createSocketForPipeline(src, 2, dfsClient);
OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
InputStream unbufIn = NetUtils.getInputStream(sock, readTimeout);
IOStreamPair saslStreams = dfsClient.saslClient
.socketSend(sock, unbufOut, unbufIn, dfsClient, blockToken, src);
unbufOut = saslStreams.out;
unbufIn = saslStreams.in;
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
DFSUtilClient.getSmallBufferSize(dfsClient.getConfiguration())));
in = new DataInputStream(unbufIn);
}
void sendTransferBlock(final DatanodeInfo[] targets,
final StorageType[] targetStorageTypes,
final Token<BlockTokenIdentifier> blockToken) throws IOException {
//send the TRANSFER_BLOCK request
new Sender(out)
.transferBlock(block, blockToken, dfsClient.clientName, targets,
targetStorageTypes);
out.flush();
//ack
BlockOpResponseProto transferResponse = BlockOpResponseProto
.parseFrom(PBHelperClient.vintPrefixed(in));
if (SUCCESS != transferResponse.getStatus()) {
throw new IOException("Failed to add a datanode. Response status: "
+ transferResponse.getStatus());
}
}
@Override
public void close() throws IOException {
IOUtils.closeStream(in);
IOUtils.closeStream(out);
IOUtils.closeSocket(sock);
}
}
/** /**
* Create a socket for a write pipeline * Create a socket for a write pipeline
* *
@ -1237,50 +1320,39 @@ class DataStreamer extends Daemon {
new IOException("Failed to add a node"); new IOException("Failed to add a node");
} }
private long computeTransferWriteTimeout() {
return dfsClient.getDatanodeWriteTimeout(2);
}
private long computeTransferReadTimeout() {
// transfer timeout multiplier based on the transfer size
// One per 200 packets = 12.8MB. Minimum is 2.
int multi = 2
+ (int) (bytesSent / dfsClient.getConf().getWritePacketSize()) / 200;
return dfsClient.getDatanodeReadTimeout(multi);
}
private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets, private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets,
final StorageType[] targetStorageTypes, final StorageType[] targetStorageTypes,
final Token<BlockTokenIdentifier> blockToken) final Token<BlockTokenIdentifier> blockToken)
throws IOException { throws IOException {
//transfer replica to the new datanode //transfer replica to the new datanode
Socket sock = null; RefetchEncryptionKeyPolicy policy = new RefetchEncryptionKeyPolicy(src);
DataOutputStream out = null; do {
DataInputStream in = null; StreamerStreams streams = null;
try { try {
sock = createSocketForPipeline(src, 2, dfsClient); final long writeTimeout = computeTransferWriteTimeout();
final long writeTimeout = dfsClient.getDatanodeWriteTimeout(2); final long readTimeout = computeTransferReadTimeout();
// transfer timeout multiplier based on the transfer size streams = new StreamerStreams(src, writeTimeout, readTimeout,
// One per 200 packets = 12.8MB. Minimum is 2. blockToken);
int multi = 2 + (int)(bytesSent /dfsClient.getConf().getWritePacketSize()) streams.sendTransferBlock(targets, targetStorageTypes, blockToken);
/ 200; return;
final long readTimeout = dfsClient.getDatanodeReadTimeout(multi); } catch (InvalidEncryptionKeyException e) {
policy.recordFailure(e);
OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout); } finally {
InputStream unbufIn = NetUtils.getInputStream(sock, readTimeout); IOUtils.closeStream(streams);
IOStreamPair saslStreams = dfsClient.saslClient.socketSend(sock,
unbufOut, unbufIn, dfsClient, blockToken, src);
unbufOut = saslStreams.out;
unbufIn = saslStreams.in;
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
DFSUtilClient.getSmallBufferSize(dfsClient.getConfiguration())));
in = new DataInputStream(unbufIn);
//send the TRANSFER_BLOCK request
new Sender(out).transferBlock(block, blockToken, dfsClient.clientName,
targets, targetStorageTypes);
out.flush();
//ack
BlockOpResponseProto response =
BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(in));
if (SUCCESS != response.getStatus()) {
throw new IOException("Failed to add a datanode");
} }
} finally { } while (policy.continueRetryingOrThrow());
IOUtils.closeStream(in);
IOUtils.closeStream(out);
IOUtils.closeSocket(sock);
}
} }
/** /**

View File

@ -49,7 +49,8 @@ public class BlockPoolTokenSecretManager extends
map.put(bpid, secretMgr); map.put(bpid, secretMgr);
} }
synchronized BlockTokenSecretManager get(String bpid) { @VisibleForTesting
public synchronized BlockTokenSecretManager get(String bpid) {
BlockTokenSecretManager secretMgr = map.get(bpid); BlockTokenSecretManager secretMgr = map.get(bpid);
if (secretMgr == null) { if (secretMgr == null) {
throw new IllegalArgumentException("Block pool " + bpid throw new IllegalArgumentException("Block pool " + bpid

View File

@ -436,6 +436,12 @@ public class BlockTokenSecretManager extends
allKeys.clear(); allKeys.clear();
} }
@VisibleForTesting
public synchronized boolean hasKey(int keyId) {
BlockKey key = allKeys.get(keyId);
return key != null;
}
@VisibleForTesting @VisibleForTesting
public synchronized int getSerialNoForTesting() { public synchronized int getSerialNoForTesting() {
return serialNo; return serialNo;

View File

@ -2708,6 +2708,11 @@ public class DataNode extends ReconfigurableBase
return directoryScanner; return directoryScanner;
} }
@VisibleForTesting
public BlockPoolTokenSecretManager getBlockPoolTokenSecretManager() {
return blockPoolTokenSecretManager;
}
public static void secureMain(String args[], SecureResources resources) { public static void secureMain(String args[], SecureResources resources) {
int errorCode = 0; int errorCode = 0;
try { try {

View File

@ -18,24 +18,31 @@
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Mockito.times;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.InetAddress; import java.net.InetAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeoutException;
import com.google.common.base.Supplier;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil; import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
@ -45,9 +52,14 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.GenericTestUtils.LogCapturer; import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters; import org.junit.runners.Parameterized.Parameters;
@ -59,6 +71,9 @@ public class TestEncryptedTransfer {
LogManager.getLogger(SaslDataTransferServer.class).setLevel(Level.DEBUG); LogManager.getLogger(SaslDataTransferServer.class).setLevel(Level.DEBUG);
LogManager.getLogger(DataTransferSaslUtil.class).setLevel(Level.DEBUG); LogManager.getLogger(DataTransferSaslUtil.class).setLevel(Level.DEBUG);
} }
@Rule
public Timeout timeout = new Timeout(300000);
@Parameters @Parameters
public static Collection<Object[]> data() { public static Collection<Object[]> data() {
@ -72,8 +87,12 @@ public class TestEncryptedTransfer {
private static final String PLAIN_TEXT = "this is very secret plain text"; private static final String PLAIN_TEXT = "this is very secret plain text";
private static final Path TEST_PATH = new Path("/non-encrypted-file"); private static final Path TEST_PATH = new Path("/non-encrypted-file");
private MiniDFSCluster cluster = null;
private Configuration conf = null;
private FileSystem fs = null;
private void setEncryptionConfigKeys(Configuration conf) { private void setEncryptionConfigKeys() {
conf.setBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY, true); conf.setBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY, true);
conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true); conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
if (resolverClazz != null){ if (resolverClazz != null){
@ -96,505 +115,360 @@ public class TestEncryptedTransfer {
this.resolverClazz = resolverClazz; this.resolverClazz = resolverClazz;
} }
@Test @Before
public void testEncryptedRead() throws IOException { public void setup() throws IOException {
MiniDFSCluster cluster = null; conf = new Configuration();
try { }
Configuration conf = new Configuration();
cluster = new MiniDFSCluster.Builder(conf).build(); @After
public void teardown() throws IOException {
FileSystem fs = getFileSystem(conf); if (fs != null) {
writeTestDataToFile(fs);
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
FileChecksum checksum = fs.getFileChecksum(TEST_PATH);
fs.close(); fs.close();
}
if (cluster != null) {
cluster.shutdown(); cluster.shutdown();
setEncryptionConfigKeys(conf);
cluster = new MiniDFSCluster.Builder(conf)
.manageDataDfsDirs(false)
.manageNameDfsDirs(false)
.format(false)
.startupOption(StartupOption.REGULAR)
.build();
fs = getFileSystem(conf);
LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
LogFactory.getLog(SaslDataTransferServer.class));
LogCapturer logs1 = GenericTestUtils.LogCapturer.captureLogs(
LogFactory.getLog(DataTransferSaslUtil.class));
try {
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
} finally {
logs.stopCapturing();
logs1.stopCapturing();
}
fs.close();
if (resolverClazz == null) {
// Test client and server negotiate cipher option
GenericTestUtils.assertDoesNotMatch(logs.getOutput(),
"Server using cipher suite");
// Check the IOStreamPair
GenericTestUtils.assertDoesNotMatch(logs1.getOutput(),
"Creating IOStreamPair of CryptoInputStream and CryptoOutputStream.");
}
} finally {
if (cluster != null) {
cluster.shutdown();
}
} }
} }
@Test
public void testEncryptedReadWithRC4() throws IOException {
MiniDFSCluster cluster = null;
try {
Configuration conf = new Configuration();
cluster = new MiniDFSCluster.Builder(conf).build();
FileSystem fs = getFileSystem(conf);
writeTestDataToFile(fs);
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
FileChecksum checksum = fs.getFileChecksum(TEST_PATH);
fs.close();
cluster.shutdown();
setEncryptionConfigKeys(conf);
// It'll use 3DES by default, but we set it to rc4 here.
conf.set(DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY, "rc4");
cluster = new MiniDFSCluster.Builder(conf)
.manageDataDfsDirs(false)
.manageNameDfsDirs(false)
.format(false)
.startupOption(StartupOption.REGULAR)
.build();
fs = getFileSystem(conf);
LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
LogFactory.getLog(SaslDataTransferServer.class));
LogCapturer logs1 = GenericTestUtils.LogCapturer.captureLogs(
LogFactory.getLog(DataTransferSaslUtil.class));
try {
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
} finally {
logs.stopCapturing();
logs1.stopCapturing();
}
fs.close(); private FileChecksum writeUnencryptedAndThenRestartEncryptedCluster()
throws IOException {
cluster = new MiniDFSCluster.Builder(conf).build();
if (resolverClazz == null) { fs = getFileSystem(conf);
// Test client and server negotiate cipher option writeTestDataToFile(fs);
GenericTestUtils.assertDoesNotMatch(logs.getOutput(), assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
"Server using cipher suite"); FileChecksum checksum = fs.getFileChecksum(TEST_PATH);
// Check the IOStreamPair fs.close();
GenericTestUtils.assertDoesNotMatch(logs1.getOutput(), cluster.shutdown();
"Creating IOStreamPair of CryptoInputStream and CryptoOutputStream.");
} setEncryptionConfigKeys();
} finally {
if (cluster != null) { cluster = new MiniDFSCluster.Builder(conf)
cluster.shutdown(); .manageDataDfsDirs(false)
} .manageNameDfsDirs(false)
} .format(false)
.startupOption(StartupOption.REGULAR)
.build();
fs = getFileSystem(conf);
return checksum;
} }
@Test private void testEncryptedRead(String algorithm, String cipherSuite,
public void testEncryptedReadWithAES() throws IOException { boolean matchLog, boolean readAfterRestart)
MiniDFSCluster cluster = null; throws IOException {
// set encryption algorithm and cipher suites, but don't enable transfer
// encryption yet.
conf.set(DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY, algorithm);
conf.set(HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY,
cipherSuite);
FileChecksum checksum = writeUnencryptedAndThenRestartEncryptedCluster();
LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
LogFactory.getLog(SaslDataTransferServer.class));
LogCapturer logs1 = GenericTestUtils.LogCapturer.captureLogs(
LogFactory.getLog(DataTransferSaslUtil.class));
try { try {
Configuration conf = new Configuration();
conf.set(HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY,
"AES/CTR/NoPadding");
cluster = new MiniDFSCluster.Builder(conf).build();
FileSystem fs = getFileSystem(conf);
writeTestDataToFile(fs);
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
FileChecksum checksum = fs.getFileChecksum(TEST_PATH); assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
fs.close(); } finally {
cluster.shutdown(); logs.stopCapturing();
logs1.stopCapturing();
}
setEncryptionConfigKeys(conf); if (resolverClazz == null) {
if (matchLog) {
cluster = new MiniDFSCluster.Builder(conf)
.manageDataDfsDirs(false)
.manageNameDfsDirs(false)
.format(false)
.startupOption(StartupOption.REGULAR)
.build();
fs = getFileSystem(conf);
LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
LogFactory.getLog(SaslDataTransferServer.class));
LogCapturer logs1 = GenericTestUtils.LogCapturer.captureLogs(
LogFactory.getLog(DataTransferSaslUtil.class));
try {
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
} finally {
logs.stopCapturing();
logs1.stopCapturing();
}
fs.close();
if (resolverClazz == null) {
// Test client and server negotiate cipher option // Test client and server negotiate cipher option
GenericTestUtils.assertMatches(logs.getOutput(), GenericTestUtils
"Server using cipher suite"); .assertMatches(logs.getOutput(), "Server using cipher suite");
// Check the IOStreamPair // Check the IOStreamPair
GenericTestUtils.assertMatches(logs1.getOutput(), GenericTestUtils.assertMatches(logs1.getOutput(),
"Creating IOStreamPair of CryptoInputStream and CryptoOutputStream."); "Creating IOStreamPair of CryptoInputStream and CryptoOutputStream.");
} } else {
} finally { // Test client and server negotiate cipher option
if (cluster != null) { GenericTestUtils
cluster.shutdown(); .assertDoesNotMatch(logs.getOutput(), "Server using cipher suite");
// Check the IOStreamPair
GenericTestUtils.assertDoesNotMatch(logs1.getOutput(),
"Creating IOStreamPair of CryptoInputStream and CryptoOutputStream.");
} }
} }
if (readAfterRestart) {
cluster.restartNameNode();
fs = getFileSystem(conf);
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
}
}
@Test
public void testEncryptedReadDefaultAlgorithmCipherSuite()
throws IOException {
testEncryptedRead("", "", false, false);
}
@Test
public void testEncryptedReadWithRC4() throws IOException {
testEncryptedRead("rc4", "", false, false);
}
@Test
public void testEncryptedReadWithAES() throws IOException {
testEncryptedRead("", "AES/CTR/NoPadding", true, false);
} }
@Test @Test
public void testEncryptedReadAfterNameNodeRestart() throws IOException { public void testEncryptedReadAfterNameNodeRestart() throws IOException {
MiniDFSCluster cluster = null; testEncryptedRead("", "", false, true);
try {
Configuration conf = new Configuration();
cluster = new MiniDFSCluster.Builder(conf).build();
FileSystem fs = getFileSystem(conf);
writeTestDataToFile(fs);
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
FileChecksum checksum = fs.getFileChecksum(TEST_PATH);
fs.close();
cluster.shutdown();
setEncryptionConfigKeys(conf);
cluster = new MiniDFSCluster.Builder(conf)
.manageDataDfsDirs(false)
.manageNameDfsDirs(false)
.format(false)
.startupOption(StartupOption.REGULAR)
.build();
fs = getFileSystem(conf);
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
fs.close();
cluster.restartNameNode();
fs = getFileSystem(conf);
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
fs.close();
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
} }
@Test @Test
public void testClientThatDoesNotSupportEncryption() throws IOException { public void testClientThatDoesNotSupportEncryption() throws IOException {
MiniDFSCluster cluster = null; // Set short retry timeouts so this test runs faster
conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10);
writeUnencryptedAndThenRestartEncryptedCluster();
DFSClient client = DFSClientAdapter.getDFSClient((DistributedFileSystem)fs);
DFSClient spyClient = Mockito.spy(client);
Mockito.doReturn(false).when(spyClient).shouldEncryptData();
DFSClientAdapter.setDFSClient((DistributedFileSystem) fs, spyClient);
LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
LogFactory.getLog(DataNode.class));
try { try {
Configuration conf = new Configuration();
// Set short retry timeouts so this test runs faster
conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10);
cluster = new MiniDFSCluster.Builder(conf).build();
FileSystem fs = getFileSystem(conf);
writeTestDataToFile(fs);
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
fs.close(); if (resolverClazz != null &&
cluster.shutdown(); !resolverClazz.endsWith("TestTrustedChannelResolver")){
fail("Should not have been able to read without encryption enabled.");
setEncryptionConfigKeys(conf);
cluster = new MiniDFSCluster.Builder(conf)
.manageDataDfsDirs(false)
.manageNameDfsDirs(false)
.format(false)
.startupOption(StartupOption.REGULAR)
.build();
fs = getFileSystem(conf);
DFSClient client = DFSClientAdapter.getDFSClient((DistributedFileSystem) fs);
DFSClient spyClient = Mockito.spy(client);
Mockito.doReturn(false).when(spyClient).shouldEncryptData();
DFSClientAdapter.setDFSClient((DistributedFileSystem) fs, spyClient);
LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
LogFactory.getLog(DataNode.class));
try {
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
if (resolverClazz != null && !resolverClazz.endsWith("TestTrustedChannelResolver")){
fail("Should not have been able to read without encryption enabled.");
}
} catch (IOException ioe) {
GenericTestUtils.assertExceptionContains("Could not obtain block:",
ioe);
} finally {
logs.stopCapturing();
}
fs.close();
if (resolverClazz == null) {
GenericTestUtils.assertMatches(logs.getOutput(),
"Failed to read expected encryption handshake from client at");
} }
} catch (IOException ioe) {
GenericTestUtils.assertExceptionContains("Could not obtain block:",
ioe);
} finally { } finally {
if (cluster != null) { logs.stopCapturing();
cluster.shutdown(); }
}
if (resolverClazz == null) {
GenericTestUtils.assertMatches(logs.getOutput(),
"Failed to read expected encryption handshake from client at");
} }
} }
@Test @Test
public void testLongLivedReadClientAfterRestart() throws IOException { public void testLongLivedReadClientAfterRestart() throws IOException {
MiniDFSCluster cluster = null; FileChecksum checksum = writeUnencryptedAndThenRestartEncryptedCluster();
try {
Configuration conf = new Configuration(); assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
cluster = new MiniDFSCluster.Builder(conf).build(); assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
FileSystem fs = getFileSystem(conf); // Restart the NN and DN, after which the client's encryption key will no
writeTestDataToFile(fs); // longer be valid.
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); cluster.restartNameNode();
FileChecksum checksum = fs.getFileChecksum(TEST_PATH); assertTrue(cluster.restartDataNode(0));
fs.close();
cluster.shutdown(); assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
setEncryptionConfigKeys(conf);
cluster = new MiniDFSCluster.Builder(conf)
.manageDataDfsDirs(false)
.manageNameDfsDirs(false)
.format(false)
.startupOption(StartupOption.REGULAR)
.build();
fs = getFileSystem(conf);
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
// Restart the NN and DN, after which the client's encryption key will no
// longer be valid.
cluster.restartNameNode();
assertTrue(cluster.restartDataNode(0));
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
fs.close();
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
} }
@Test @Test
public void testLongLivedWriteClientAfterRestart() throws IOException { public void testLongLivedWriteClientAfterRestart() throws IOException {
MiniDFSCluster cluster = null; setEncryptionConfigKeys();
try { cluster = new MiniDFSCluster.Builder(conf).build();
Configuration conf = new Configuration();
setEncryptionConfigKeys(conf); fs = getFileSystem(conf);
cluster = new MiniDFSCluster.Builder(conf).build();
writeTestDataToFile(fs);
FileSystem fs = getFileSystem(conf); assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
writeTestDataToFile(fs); // Restart the NN and DN, after which the client's encryption key will no
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); // longer be valid.
cluster.restartNameNode();
// Restart the NN and DN, after which the client's encryption key will no assertTrue(cluster.restartDataNodes());
// longer be valid. cluster.waitActive();
cluster.restartNameNode();
assertTrue(cluster.restartDataNodes()); writeTestDataToFile(fs);
cluster.waitActive(); assertEquals(PLAIN_TEXT + PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
writeTestDataToFile(fs);
assertEquals(PLAIN_TEXT + PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
fs.close();
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
} }
@Test @Test
public void testLongLivedClient() throws IOException, InterruptedException { public void testLongLivedClient() throws IOException, InterruptedException {
MiniDFSCluster cluster = null; FileChecksum checksum = writeUnencryptedAndThenRestartEncryptedCluster();
try {
Configuration conf = new Configuration(); BlockTokenSecretManager btsm = cluster.getNamesystem().getBlockManager()
cluster = new MiniDFSCluster.Builder(conf).build(); .getBlockTokenSecretManager();
btsm.setKeyUpdateIntervalForTesting(2 * 1000);
FileSystem fs = getFileSystem(conf); btsm.setTokenLifetime(2 * 1000);
writeTestDataToFile(fs); btsm.clearAllKeysForTesting();
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
FileChecksum checksum = fs.getFileChecksum(TEST_PATH); assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
fs.close(); assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
cluster.shutdown();
// Sleep for 15 seconds, after which the encryption key will no longer be
setEncryptionConfigKeys(conf); // valid. It needs to be a few multiples of the block token lifetime,
// since several block tokens are valid at any given time (the current
cluster = new MiniDFSCluster.Builder(conf) // and the last two, by default.)
.manageDataDfsDirs(false) LOG.info("Sleeping so that encryption keys expire...");
.manageNameDfsDirs(false) Thread.sleep(15 * 1000);
.format(false) LOG.info("Done sleeping.");
.startupOption(StartupOption.REGULAR)
.build(); assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
BlockTokenSecretManager btsm = cluster.getNamesystem().getBlockManager()
.getBlockTokenSecretManager();
btsm.setKeyUpdateIntervalForTesting(2 * 1000);
btsm.setTokenLifetime(2 * 1000);
btsm.clearAllKeysForTesting();
fs = getFileSystem(conf);
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
// Sleep for 15 seconds, after which the encryption key will no longer be
// valid. It needs to be a few multiples of the block token lifetime,
// since several block tokens are valid at any given time (the current
// and the last two, by default.)
LOG.info("Sleeping so that encryption keys expire...");
Thread.sleep(15 * 1000);
LOG.info("Done sleeping.");
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
fs.close();
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
} }
@Test
public void testLongLivedClientPipelineRecovery()
throws IOException, InterruptedException, TimeoutException {
if (resolverClazz != null) {
// TestTrustedChannelResolver does not use encryption keys.
return;
}
// use 4 datanodes to make sure that after 1 data node is stopped,
// client only retries establishing pipeline with the 4th node.
int numDataNodes = 4;
// do not consider load factor when selecting a data node
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY,
false);
setEncryptionConfigKeys();
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(numDataNodes)
.build();
fs = getFileSystem(conf);
DFSClient client = DFSClientAdapter.getDFSClient((DistributedFileSystem)fs);
DFSClient spyClient = Mockito.spy(client);
DFSClientAdapter.setDFSClient((DistributedFileSystem) fs, spyClient);
writeTestDataToFile(fs);
BlockTokenSecretManager btsm = cluster.getNamesystem().getBlockManager()
.getBlockTokenSecretManager();
// Reduce key update interval and token life for testing.
btsm.setKeyUpdateIntervalForTesting(2 * 1000);
btsm.setTokenLifetime(2 * 1000);
btsm.clearAllKeysForTesting();
// Wait until the encryption key becomes invalid.
LOG.info("Wait until encryption keys become invalid...");
final DataEncryptionKey encryptionKey = spyClient.getEncryptionKey();
List<DataNode> dataNodes = cluster.getDataNodes();
for (final DataNode dn: dataNodes) {
GenericTestUtils.waitFor(
new Supplier<Boolean>() {
@Override
public Boolean get() {
return !dn.getBlockPoolTokenSecretManager().
get(encryptionKey.blockPoolId)
.hasKey(encryptionKey.keyId);
}
}, 100, 30*1000
);
}
LOG.info("The encryption key is invalid on all nodes now.");
try(FSDataOutputStream out = fs.append(TEST_PATH)) {
DFSOutputStream dfstream = (DFSOutputStream) out.getWrappedStream();
// shut down the first datanode in the pipeline.
DatanodeInfo[] targets = dfstream.getPipeline();
cluster.stopDataNode(targets[0].getXferAddr());
// write data to induce pipeline recovery
out.write(PLAIN_TEXT.getBytes());
out.hflush();
assertFalse("The first datanode in the pipeline was not replaced.",
Arrays.asList(dfstream.getPipeline()).contains(targets[0]));
}
// verify that InvalidEncryptionKeyException is handled properly
Mockito.verify(spyClient, times(1)).clearDataEncryptionKey();
}
@Test @Test
public void testEncryptedWriteWithOneDn() throws IOException { public void testEncryptedWriteWithOneDn() throws IOException {
testEncryptedWrite(1); testEncryptedWrite(1);
} }
@Test @Test
public void testEncryptedWriteWithTwoDns() throws IOException { public void testEncryptedWriteWithTwoDns() throws IOException {
testEncryptedWrite(2); testEncryptedWrite(2);
} }
@Test @Test
public void testEncryptedWriteWithMultipleDns() throws IOException { public void testEncryptedWriteWithMultipleDns() throws IOException {
testEncryptedWrite(10); testEncryptedWrite(10);
} }
private void testEncryptedWrite(int numDns) throws IOException { private void testEncryptedWrite(int numDns) throws IOException {
MiniDFSCluster cluster = null; setEncryptionConfigKeys();
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDns).build();
fs = getFileSystem(conf);
LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
LogFactory.getLog(SaslDataTransferServer.class));
LogCapturer logs1 = GenericTestUtils.LogCapturer.captureLogs(
LogFactory.getLog(DataTransferSaslUtil.class));
try { try {
Configuration conf = new Configuration(); writeTestDataToFile(fs);
setEncryptionConfigKeys(conf);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDns).build();
FileSystem fs = getFileSystem(conf);
LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
LogFactory.getLog(SaslDataTransferServer.class));
LogCapturer logs1 = GenericTestUtils.LogCapturer.captureLogs(
LogFactory.getLog(DataTransferSaslUtil.class));
try {
writeTestDataToFile(fs);
} finally {
logs.stopCapturing();
logs1.stopCapturing();
}
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
fs.close();
if (resolverClazz == null) {
// Test client and server negotiate cipher option
GenericTestUtils.assertDoesNotMatch(logs.getOutput(),
"Server using cipher suite");
// Check the IOStreamPair
GenericTestUtils.assertDoesNotMatch(logs1.getOutput(),
"Creating IOStreamPair of CryptoInputStream and CryptoOutputStream.");
}
} finally { } finally {
if (cluster != null) { logs.stopCapturing();
cluster.shutdown(); logs1.stopCapturing();
} }
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
if (resolverClazz == null) {
// Test client and server negotiate cipher option
GenericTestUtils.assertDoesNotMatch(logs.getOutput(),
"Server using cipher suite");
// Check the IOStreamPair
GenericTestUtils.assertDoesNotMatch(logs1.getOutput(),
"Creating IOStreamPair of CryptoInputStream and CryptoOutputStream.");
} }
} }
@Test @Test
public void testEncryptedAppend() throws IOException { public void testEncryptedAppend() throws IOException {
MiniDFSCluster cluster = null; setEncryptionConfigKeys();
try {
Configuration conf = new Configuration(); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
setEncryptionConfigKeys(conf);
fs = getFileSystem(conf);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
writeTestDataToFile(fs);
FileSystem fs = getFileSystem(conf); assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
writeTestDataToFile(fs); writeTestDataToFile(fs);
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); assertEquals(PLAIN_TEXT + PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
writeTestDataToFile(fs);
assertEquals(PLAIN_TEXT + PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
fs.close();
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
} }
@Test @Test
public void testEncryptedAppendRequiringBlockTransfer() throws IOException { public void testEncryptedAppendRequiringBlockTransfer() throws IOException {
MiniDFSCluster cluster = null; setEncryptionConfigKeys();
try {
Configuration conf = new Configuration(); // start up 4 DNs
setEncryptionConfigKeys(conf); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4).build();
// start up 4 DNs fs = getFileSystem(conf);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4).build();
// Create a file with replication 3, so its block is on 3 / 4 DNs.
FileSystem fs = getFileSystem(conf); writeTestDataToFile(fs);
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
// Create a file with replication 3, so its block is on 3 / 4 DNs.
writeTestDataToFile(fs); // Shut down one of the DNs holding a block replica.
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); FSDataInputStream in = fs.open(TEST_PATH);
List<LocatedBlock> locatedBlocks = DFSTestUtil.getAllBlocks(in);
// Shut down one of the DNs holding a block replica. in.close();
FSDataInputStream in = fs.open(TEST_PATH); assertEquals(1, locatedBlocks.size());
List<LocatedBlock> locatedBlocks = DFSTestUtil.getAllBlocks(in); assertEquals(3, locatedBlocks.get(0).getLocations().length);
in.close(); DataNode dn = cluster.getDataNode(
assertEquals(1, locatedBlocks.size()); locatedBlocks.get(0).getLocations()[0].getIpcPort());
assertEquals(3, locatedBlocks.get(0).getLocations().length); dn.shutdown();
DataNode dn = cluster.getDataNode(locatedBlocks.get(0).getLocations()[0].getIpcPort());
dn.shutdown(); // Reopen the file for append, which will need to add another DN to the
// pipeline and in doing so trigger a block transfer.
// Reopen the file for append, which will need to add another DN to the writeTestDataToFile(fs);
// pipeline and in doing so trigger a block transfer. assertEquals(PLAIN_TEXT + PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
writeTestDataToFile(fs);
assertEquals(PLAIN_TEXT + PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
fs.close();
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
} }
private static void writeTestDataToFile(FileSystem fs) throws IOException { private static void writeTestDataToFile(FileSystem fs) throws IOException {