HDFS-5910. Enhance DataTransferProtocol to allow per-connection choice of encryption/plain-text. (Contributed by Benoy Antony)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1581688 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
71c2b159ab
commit
1fbb04e367
|
@ -446,6 +446,9 @@ Release 2.4.0 - UNRELEASED
|
|||
HDFS-6124. Add final modifier to class members. (Suresh Srinivas via
|
||||
Arpit Agarwal)
|
||||
|
||||
HDFS-5910. Enhance DataTransferProtocol to allow per-connection choice
|
||||
of encryption/plain-text. (Benoy Antony via Arpit Agarwal)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-5790. LeaseManager.findPath is very slow when many leases need recovery
|
||||
|
|
|
@ -153,6 +153,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
|
|||
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
|
||||
|
@ -228,6 +229,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
|
|||
private final Random r = new Random();
|
||||
private SocketAddress[] localInterfaceAddrs;
|
||||
private DataEncryptionKey encryptionKey;
|
||||
final TrustedChannelResolver trustedChannelResolver;
|
||||
private final CachingStrategy defaultReadCachingStrategy;
|
||||
private final CachingStrategy defaultWriteCachingStrategy;
|
||||
private final ClientContext clientContext;
|
||||
|
@ -609,6 +611,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
|
|||
if (numThreads > 0) {
|
||||
this.initThreadsNumForHedgedReads(numThreads);
|
||||
}
|
||||
this.trustedChannelResolver = TrustedChannelResolver.getInstance(getConfiguration());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1813,7 +1816,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
|
|||
@InterfaceAudience.Private
|
||||
public DataEncryptionKey getDataEncryptionKey()
|
||||
throws IOException {
|
||||
if (shouldEncryptData()) {
|
||||
if (shouldEncryptData() &&
|
||||
!this.trustedChannelResolver.isTrusted()) {
|
||||
synchronized (this) {
|
||||
if (encryptionKey == null ||
|
||||
encryptionKey.expiryDate < Time.now()) {
|
||||
|
|
|
@ -534,6 +534,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||
public static final String DFS_ENCRYPT_DATA_TRANSFER_KEY = "dfs.encrypt.data.transfer";
|
||||
public static final boolean DFS_ENCRYPT_DATA_TRANSFER_DEFAULT = false;
|
||||
public static final String DFS_DATA_ENCRYPTION_ALGORITHM_KEY = "dfs.encrypt.data.transfer.algorithm";
|
||||
public static final String DFS_TRUSTEDCHANNEL_RESOLVER_CLASS = "dfs.trustedchannel.resolver.class";
|
||||
|
||||
// Journal-node related configs. These are read on the JN side.
|
||||
public static final String DFS_JOURNALNODE_EDITS_DIR_KEY = "dfs.journalnode.edits.dir";
|
||||
|
|
|
@ -1038,7 +1038,8 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
|
||||
OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
|
||||
InputStream unbufIn = NetUtils.getInputStream(sock);
|
||||
if (dfsClient.shouldEncryptData()) {
|
||||
if (dfsClient.shouldEncryptData() &&
|
||||
!dfsClient.trustedChannelResolver.isTrusted(sock.getInetAddress())) {
|
||||
IOStreamPair encryptedStreams =
|
||||
DataTransferEncryptor.getEncryptedStreams(
|
||||
unbufOut, unbufIn, dfsClient.getDataEncryptionKey());
|
||||
|
@ -1314,7 +1315,8 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
|
||||
OutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout);
|
||||
InputStream unbufIn = NetUtils.getInputStream(s);
|
||||
if (dfsClient.shouldEncryptData()) {
|
||||
if (dfsClient.shouldEncryptData() &&
|
||||
!dfsClient.trustedChannelResolver.isTrusted(s.getInetAddress())) {
|
||||
IOStreamPair encryptedStreams =
|
||||
DataTransferEncryptor.getEncryptedStreams(unbufOut,
|
||||
unbufIn, dfsClient.getDataEncryptionKey());
|
||||
|
|
|
@ -0,0 +1,81 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.protocol.datatransfer;
|
||||
|
||||
import java.net.InetAddress;
|
||||
|
||||
import org.apache.hadoop.conf.Configurable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
|
||||
/**
|
||||
* Class used to indicate whether a channel is trusted or not.
|
||||
* The default implementation is to return false indicating that
|
||||
* the channel is not trusted.
|
||||
* This class can be overridden to provide custom logic to determine
|
||||
* whether a channel is trusted or not.
|
||||
* The custom class can be specified via configuration.
|
||||
*
|
||||
*/
|
||||
public class TrustedChannelResolver implements Configurable {
|
||||
Configuration conf;
|
||||
|
||||
/**
|
||||
* Returns an instance of TrustedChannelResolver.
|
||||
* Looks up the configuration to see if there is custom class specified.
|
||||
* @param conf
|
||||
* @return TrustedChannelResolver
|
||||
*/
|
||||
public static TrustedChannelResolver getInstance(Configuration conf) {
|
||||
Class<? extends TrustedChannelResolver> clazz =
|
||||
conf.getClass(
|
||||
DFSConfigKeys.DFS_TRUSTEDCHANNEL_RESOLVER_CLASS,
|
||||
TrustedChannelResolver.class, TrustedChannelResolver.class);
|
||||
return ReflectionUtils.newInstance(clazz, conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setConf(Configuration conf) {
|
||||
this.conf = conf;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Configuration getConf() {
|
||||
return conf;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return boolean value indicating whether a channel is trusted or not
|
||||
* from a client's perspective.
|
||||
* @return true if the channel is trusted and false otherwise.
|
||||
*/
|
||||
public boolean isTrusted() {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Identify boolean value indicating whether a channel is trusted or not.
|
||||
* @param peerAddress address of the peer
|
||||
* @return true if the channel is trusted and false otherwise.
|
||||
*/
|
||||
public boolean isTrusted(InetAddress peerAddress) {
|
||||
return false;
|
||||
}
|
||||
}
|
|
@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
|
|||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
|
||||
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
|
||||
|
@ -71,6 +72,7 @@ class NameNodeConnector {
|
|||
private BlockTokenSecretManager blockTokenSecretManager;
|
||||
private Daemon keyupdaterthread; // AccessKeyUpdater thread
|
||||
private DataEncryptionKey encryptionKey;
|
||||
private final TrustedChannelResolver trustedChannelResolver;
|
||||
|
||||
NameNodeConnector(URI nameNodeUri,
|
||||
Configuration conf) throws IOException {
|
||||
|
@ -120,6 +122,7 @@ class NameNodeConnector {
|
|||
if (out == null) {
|
||||
throw new IOException("Another balancer is running");
|
||||
}
|
||||
this.trustedChannelResolver = TrustedChannelResolver.getInstance(conf);
|
||||
}
|
||||
|
||||
boolean shouldContinue(long dispatchBlockMoveBytes) {
|
||||
|
@ -154,7 +157,7 @@ class NameNodeConnector {
|
|||
|
||||
DataEncryptionKey getDataEncryptionKey()
|
||||
throws IOException {
|
||||
if (encryptDataTransfer) {
|
||||
if (encryptDataTransfer && !this.trustedChannelResolver.isTrusted()) {
|
||||
synchronized (this) {
|
||||
if (encryptionKey == null) {
|
||||
encryptionKey = blockTokenSecretManager.generateDataEncryptionKey();
|
||||
|
|
|
@ -51,6 +51,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_RESTART_REPLICA_
|
|||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||
|
||||
/**
|
||||
|
@ -82,6 +83,7 @@ public class DNConf {
|
|||
|
||||
final String minimumNameNodeVersion;
|
||||
final String encryptionAlgorithm;
|
||||
final TrustedChannelResolver trustedChannelResolver;
|
||||
|
||||
final long xceiverStopTimeout;
|
||||
final long restartReplicaExpiry;
|
||||
|
@ -152,6 +154,7 @@ public class DNConf {
|
|||
this.encryptDataTransfer = conf.getBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY,
|
||||
DFS_ENCRYPT_DATA_TRANSFER_DEFAULT);
|
||||
this.encryptionAlgorithm = conf.get(DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
|
||||
this.trustedChannelResolver = TrustedChannelResolver.getInstance(conf);
|
||||
|
||||
this.xceiverStopTimeout = conf.getLong(
|
||||
DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY,
|
||||
|
|
|
@ -1615,7 +1615,8 @@ public class DataNode extends Configured
|
|||
HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1);
|
||||
OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
|
||||
InputStream unbufIn = NetUtils.getInputStream(sock);
|
||||
if (dnConf.encryptDataTransfer) {
|
||||
if (dnConf.encryptDataTransfer &&
|
||||
!dnConf.trustedChannelResolver.isTrusted(sock.getInetAddress())) {
|
||||
IOStreamPair encryptedStreams =
|
||||
DataTransferEncryptor.getEncryptedStreams(
|
||||
unbufOut, unbufIn,
|
||||
|
|
|
@ -36,9 +36,11 @@ import java.io.IOException;
|
|||
import java.io.InputStream;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.io.OutputStream;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.Socket;
|
||||
import java.net.SocketException;
|
||||
import java.net.UnknownHostException;
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.util.Arrays;
|
||||
|
||||
|
@ -81,6 +83,7 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
|||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
|
||||
import com.google.common.net.InetAddresses;
|
||||
import com.google.protobuf.ByteString;
|
||||
|
||||
|
||||
|
@ -169,7 +172,8 @@ class DataXceiver extends Receiver implements Runnable {
|
|||
dataXceiverServer.addPeer(peer, Thread.currentThread());
|
||||
peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout);
|
||||
InputStream input = socketIn;
|
||||
if ((!peer.hasSecureChannel()) && dnConf.encryptDataTransfer) {
|
||||
if ((!peer.hasSecureChannel()) && dnConf.encryptDataTransfer &&
|
||||
!dnConf.trustedChannelResolver.isTrusted(getClientAddress(peer))){
|
||||
IOStreamPair encryptedStreams = null;
|
||||
try {
|
||||
encryptedStreams = DataTransferEncryptor.getEncryptedStreams(socketOut,
|
||||
|
@ -258,6 +262,19 @@ class DataXceiver extends Receiver implements Runnable {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns InetAddress from peer
|
||||
* The getRemoteAddressString is the form /ip-address:port
|
||||
* The ip-address is extracted from peer and InetAddress is formed
|
||||
* @param peer
|
||||
* @return
|
||||
* @throws UnknownHostException
|
||||
*/
|
||||
private static InetAddress getClientAddress(Peer peer) {
|
||||
return InetAddresses.forString(
|
||||
peer.getRemoteAddressString().split(":")[0].substring(1));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void requestShortCircuitFds(final ExtendedBlock blk,
|
||||
final Token<BlockTokenIdentifier> token,
|
||||
|
@ -637,7 +654,8 @@ class DataXceiver extends Receiver implements Runnable {
|
|||
OutputStream unbufMirrorOut = NetUtils.getOutputStream(mirrorSock,
|
||||
writeTimeout);
|
||||
InputStream unbufMirrorIn = NetUtils.getInputStream(mirrorSock);
|
||||
if (dnConf.encryptDataTransfer) {
|
||||
if (dnConf.encryptDataTransfer &&
|
||||
!dnConf.trustedChannelResolver.isTrusted(mirrorSock.getInetAddress())) {
|
||||
IOStreamPair encryptedStreams =
|
||||
DataTransferEncryptor.getEncryptedStreams(
|
||||
unbufMirrorOut, unbufMirrorIn,
|
||||
|
@ -963,7 +981,9 @@ class DataXceiver extends Receiver implements Runnable {
|
|||
OutputStream unbufProxyOut = NetUtils.getOutputStream(proxySock,
|
||||
dnConf.socketWriteTimeout);
|
||||
InputStream unbufProxyIn = NetUtils.getInputStream(proxySock);
|
||||
if (dnConf.encryptDataTransfer) {
|
||||
if (dnConf.encryptDataTransfer &&
|
||||
!dnConf.trustedChannelResolver.isTrusted(
|
||||
proxySock.getInetAddress())) {
|
||||
IOStreamPair encryptedStreams =
|
||||
DataTransferEncryptor.getEncryptedStreams(
|
||||
unbufProxyOut, unbufProxyIn,
|
||||
|
|
|
@ -1338,7 +1338,8 @@
|
|||
<description>
|
||||
Whether or not actual block data that is read/written from/to HDFS should
|
||||
be encrypted on the wire. This only needs to be set on the NN and DNs,
|
||||
clients will deduce this automatically.
|
||||
clients will deduce this automatically. It is possible to override this setting
|
||||
per connection by specifying custom logic via dfs.trustedchannel.resolver.class.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
|
@ -1353,6 +1354,20 @@
|
|||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.trustedchannel.resolver.class</name>
|
||||
<value></value>
|
||||
<description>
|
||||
TrustedChannelResolver is used to determine whether a channel
|
||||
is trusted for plain data transfer. The TrustedChannelResolver is
|
||||
invoked on both client and server side. If the resolver indicates
|
||||
that the channel is trusted, then the data transfer will not be
|
||||
encrypted even if dfs.encrypt.data.transfer is set to true. The
|
||||
default implementation returns false indicating that the channel
|
||||
is not trusted.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.datanode.hdfs-blocks-metadata.enabled</name>
|
||||
<value>false</value>
|
||||
|
|
|
@ -23,6 +23,9 @@ import static org.junit.Assert.fail;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.net.InetAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -33,24 +36,40 @@ import org.apache.hadoop.fs.FileChecksum;
|
|||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.junit.runners.Parameterized.Parameters;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
public class TestEncryptedTransfer {
|
||||
|
||||
@Parameters
|
||||
public static Collection<Object[]> data() {
|
||||
Collection<Object[]> params = new ArrayList<Object[]>();
|
||||
params.add(new Object[]{null});
|
||||
params.add(new Object[]{"org.apache.hadoop.hdfs.TestEncryptedTransfer$TestTrustedChannelResolver"});
|
||||
return params;
|
||||
}
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(TestEncryptedTransfer.class);
|
||||
|
||||
private static final String PLAIN_TEXT = "this is very secret plain text";
|
||||
private static final Path TEST_PATH = new Path("/non-encrypted-file");
|
||||
|
||||
private static void setEncryptionConfigKeys(Configuration conf) {
|
||||
private void setEncryptionConfigKeys(Configuration conf) {
|
||||
conf.setBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY, true);
|
||||
conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
|
||||
if (resolverClazz != null){
|
||||
conf.set(DFSConfigKeys.DFS_TRUSTEDCHANNEL_RESOLVER_CLASS, resolverClazz);
|
||||
}
|
||||
}
|
||||
|
||||
// Unset DFS_ENCRYPT_DATA_TRANSFER_KEY and DFS_DATA_ENCRYPTION_ALGORITHM_KEY
|
||||
|
@ -63,6 +82,11 @@ public class TestEncryptedTransfer {
|
|||
return FileSystem.get(localConf);
|
||||
}
|
||||
|
||||
String resolverClazz;
|
||||
public TestEncryptedTransfer(String resolverClazz){
|
||||
this.resolverClazz = resolverClazz;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEncryptedRead() throws IOException {
|
||||
MiniDFSCluster cluster = null;
|
||||
|
@ -206,7 +230,9 @@ public class TestEncryptedTransfer {
|
|||
LogFactory.getLog(DataNode.class));
|
||||
try {
|
||||
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
|
||||
fail("Should not have been able to read without encryption enabled.");
|
||||
if (resolverClazz != null && !resolverClazz.endsWith("TestTrustedChannelResolver")){
|
||||
fail("Should not have been able to read without encryption enabled.");
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
GenericTestUtils.assertExceptionContains("Could not obtain block:",
|
||||
ioe);
|
||||
|
@ -215,8 +241,10 @@ public class TestEncryptedTransfer {
|
|||
}
|
||||
fs.close();
|
||||
|
||||
GenericTestUtils.assertMatches(logs.getOutput(),
|
||||
"Failed to read expected encryption handshake from client at");
|
||||
if (resolverClazz != null && !resolverClazz.endsWith("TestTrustedChannelResolver")){
|
||||
GenericTestUtils.assertMatches(logs.getOutput(),
|
||||
"Failed to read expected encryption handshake from client at");
|
||||
}
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
|
@ -456,4 +484,16 @@ public class TestEncryptedTransfer {
|
|||
out.write(PLAIN_TEXT.getBytes());
|
||||
out.close();
|
||||
}
|
||||
|
||||
static class TestTrustedChannelResolver extends TrustedChannelResolver {
|
||||
|
||||
public boolean isTrusted(){
|
||||
return true;
|
||||
}
|
||||
|
||||
public boolean isTrusted(InetAddress peerAddress){
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue