HDFS-5910: Merging r1581688 from trunk to branch-2.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1581690 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
859f27040d
commit
38d0fb9238
|
@ -204,6 +204,9 @@ Release 2.4.0 - UNRELEASED
|
||||||
HDFS-6124. Add final modifier to class members. (Suresh Srinivas via
|
HDFS-6124. Add final modifier to class members. (Suresh Srinivas via
|
||||||
Arpit Agarwal)
|
Arpit Agarwal)
|
||||||
|
|
||||||
|
HDFS-5910. Enhance DataTransferProtocol to allow per-connection choice
|
||||||
|
of encryption/plain-text. (Benoy Antony via Arpit Agarwal)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-5790. LeaseManager.findPath is very slow when many leases need recovery
|
HDFS-5790. LeaseManager.findPath is very slow when many leases need recovery
|
||||||
|
|
|
@ -155,6 +155,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
|
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
|
import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure;
|
import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
|
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
|
||||||
|
@ -230,6 +231,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
|
||||||
private final Random r = new Random();
|
private final Random r = new Random();
|
||||||
private SocketAddress[] localInterfaceAddrs;
|
private SocketAddress[] localInterfaceAddrs;
|
||||||
private DataEncryptionKey encryptionKey;
|
private DataEncryptionKey encryptionKey;
|
||||||
|
final TrustedChannelResolver trustedChannelResolver;
|
||||||
private final CachingStrategy defaultReadCachingStrategy;
|
private final CachingStrategy defaultReadCachingStrategy;
|
||||||
private final CachingStrategy defaultWriteCachingStrategy;
|
private final CachingStrategy defaultWriteCachingStrategy;
|
||||||
private final ClientContext clientContext;
|
private final ClientContext clientContext;
|
||||||
|
@ -611,6 +613,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
|
||||||
if (numThreads > 0) {
|
if (numThreads > 0) {
|
||||||
this.initThreadsNumForHedgedReads(numThreads);
|
this.initThreadsNumForHedgedReads(numThreads);
|
||||||
}
|
}
|
||||||
|
this.trustedChannelResolver = TrustedChannelResolver.getInstance(getConfiguration());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1831,7 +1834,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public DataEncryptionKey getDataEncryptionKey()
|
public DataEncryptionKey getDataEncryptionKey()
|
||||||
throws IOException {
|
throws IOException {
|
||||||
if (shouldEncryptData()) {
|
if (shouldEncryptData() &&
|
||||||
|
!this.trustedChannelResolver.isTrusted()) {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
if (encryptionKey == null ||
|
if (encryptionKey == null ||
|
||||||
encryptionKey.expiryDate < Time.now()) {
|
encryptionKey.expiryDate < Time.now()) {
|
||||||
|
|
|
@ -536,6 +536,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||||
public static final String DFS_ENCRYPT_DATA_TRANSFER_KEY = "dfs.encrypt.data.transfer";
|
public static final String DFS_ENCRYPT_DATA_TRANSFER_KEY = "dfs.encrypt.data.transfer";
|
||||||
public static final boolean DFS_ENCRYPT_DATA_TRANSFER_DEFAULT = false;
|
public static final boolean DFS_ENCRYPT_DATA_TRANSFER_DEFAULT = false;
|
||||||
public static final String DFS_DATA_ENCRYPTION_ALGORITHM_KEY = "dfs.encrypt.data.transfer.algorithm";
|
public static final String DFS_DATA_ENCRYPTION_ALGORITHM_KEY = "dfs.encrypt.data.transfer.algorithm";
|
||||||
|
public static final String DFS_TRUSTEDCHANNEL_RESOLVER_CLASS = "dfs.trustedchannel.resolver.class";
|
||||||
|
|
||||||
// Journal-node related configs. These are read on the JN side.
|
// Journal-node related configs. These are read on the JN side.
|
||||||
public static final String DFS_JOURNALNODE_EDITS_DIR_KEY = "dfs.journalnode.edits.dir";
|
public static final String DFS_JOURNALNODE_EDITS_DIR_KEY = "dfs.journalnode.edits.dir";
|
||||||
|
|
|
@ -1043,7 +1043,8 @@ public class DFSOutputStream extends FSOutputSummer
|
||||||
|
|
||||||
OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
|
OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
|
||||||
InputStream unbufIn = NetUtils.getInputStream(sock);
|
InputStream unbufIn = NetUtils.getInputStream(sock);
|
||||||
if (dfsClient.shouldEncryptData()) {
|
if (dfsClient.shouldEncryptData() &&
|
||||||
|
!dfsClient.trustedChannelResolver.isTrusted(sock.getInetAddress())) {
|
||||||
IOStreamPair encryptedStreams =
|
IOStreamPair encryptedStreams =
|
||||||
DataTransferEncryptor.getEncryptedStreams(
|
DataTransferEncryptor.getEncryptedStreams(
|
||||||
unbufOut, unbufIn, dfsClient.getDataEncryptionKey());
|
unbufOut, unbufIn, dfsClient.getDataEncryptionKey());
|
||||||
|
@ -1319,7 +1320,8 @@ public class DFSOutputStream extends FSOutputSummer
|
||||||
|
|
||||||
OutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout);
|
OutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout);
|
||||||
InputStream unbufIn = NetUtils.getInputStream(s);
|
InputStream unbufIn = NetUtils.getInputStream(s);
|
||||||
if (dfsClient.shouldEncryptData()) {
|
if (dfsClient.shouldEncryptData() &&
|
||||||
|
!dfsClient.trustedChannelResolver.isTrusted(s.getInetAddress())) {
|
||||||
IOStreamPair encryptedStreams =
|
IOStreamPair encryptedStreams =
|
||||||
DataTransferEncryptor.getEncryptedStreams(unbufOut,
|
DataTransferEncryptor.getEncryptedStreams(unbufOut,
|
||||||
unbufIn, dfsClient.getDataEncryptionKey());
|
unbufIn, dfsClient.getDataEncryptionKey());
|
||||||
|
|
|
@ -0,0 +1,81 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.protocol.datatransfer;
|
||||||
|
|
||||||
|
import java.net.InetAddress;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configurable;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class used to indicate whether a channel is trusted or not.
|
||||||
|
* The default implementation is to return false indicating that
|
||||||
|
* the channel is not trusted.
|
||||||
|
* This class can be overridden to provide custom logic to determine
|
||||||
|
* whether a channel is trusted or not.
|
||||||
|
* The custom class can be specified via configuration.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public class TrustedChannelResolver implements Configurable {
|
||||||
|
Configuration conf;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns an instance of TrustedChannelResolver.
|
||||||
|
* Looks up the configuration to see if there is custom class specified.
|
||||||
|
* @param conf
|
||||||
|
* @return TrustedChannelResolver
|
||||||
|
*/
|
||||||
|
public static TrustedChannelResolver getInstance(Configuration conf) {
|
||||||
|
Class<? extends TrustedChannelResolver> clazz =
|
||||||
|
conf.getClass(
|
||||||
|
DFSConfigKeys.DFS_TRUSTEDCHANNEL_RESOLVER_CLASS,
|
||||||
|
TrustedChannelResolver.class, TrustedChannelResolver.class);
|
||||||
|
return ReflectionUtils.newInstance(clazz, conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setConf(Configuration conf) {
|
||||||
|
this.conf = conf;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Configuration getConf() {
|
||||||
|
return conf;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return boolean value indicating whether a channel is trusted or not
|
||||||
|
* from a client's perspective.
|
||||||
|
* @return true if the channel is trusted and false otherwise.
|
||||||
|
*/
|
||||||
|
public boolean isTrusted() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Identify boolean value indicating whether a channel is trusted or not.
|
||||||
|
* @param peerAddress address of the peer
|
||||||
|
* @return true if the channel is trusted and false otherwise.
|
||||||
|
*/
|
||||||
|
public boolean isTrusted(InetAddress peerAddress) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
|
@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
|
||||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
|
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
|
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
|
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
|
||||||
|
@ -71,6 +72,7 @@ class NameNodeConnector {
|
||||||
private BlockTokenSecretManager blockTokenSecretManager;
|
private BlockTokenSecretManager blockTokenSecretManager;
|
||||||
private Daemon keyupdaterthread; // AccessKeyUpdater thread
|
private Daemon keyupdaterthread; // AccessKeyUpdater thread
|
||||||
private DataEncryptionKey encryptionKey;
|
private DataEncryptionKey encryptionKey;
|
||||||
|
private final TrustedChannelResolver trustedChannelResolver;
|
||||||
|
|
||||||
NameNodeConnector(URI nameNodeUri,
|
NameNodeConnector(URI nameNodeUri,
|
||||||
Configuration conf) throws IOException {
|
Configuration conf) throws IOException {
|
||||||
|
@ -120,6 +122,7 @@ class NameNodeConnector {
|
||||||
if (out == null) {
|
if (out == null) {
|
||||||
throw new IOException("Another balancer is running");
|
throw new IOException("Another balancer is running");
|
||||||
}
|
}
|
||||||
|
this.trustedChannelResolver = TrustedChannelResolver.getInstance(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean shouldContinue(long dispatchBlockMoveBytes) {
|
boolean shouldContinue(long dispatchBlockMoveBytes) {
|
||||||
|
@ -154,7 +157,7 @@ class NameNodeConnector {
|
||||||
|
|
||||||
DataEncryptionKey getDataEncryptionKey()
|
DataEncryptionKey getDataEncryptionKey()
|
||||||
throws IOException {
|
throws IOException {
|
||||||
if (encryptDataTransfer) {
|
if (encryptDataTransfer && !this.trustedChannelResolver.isTrusted()) {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
if (encryptionKey == null) {
|
if (encryptionKey == null) {
|
||||||
encryptionKey = blockTokenSecretManager.generateDataEncryptionKey();
|
encryptionKey = blockTokenSecretManager.generateDataEncryptionKey();
|
||||||
|
|
|
@ -51,6 +51,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_RESTART_REPLICA_
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -82,6 +83,7 @@ public class DNConf {
|
||||||
|
|
||||||
final String minimumNameNodeVersion;
|
final String minimumNameNodeVersion;
|
||||||
final String encryptionAlgorithm;
|
final String encryptionAlgorithm;
|
||||||
|
final TrustedChannelResolver trustedChannelResolver;
|
||||||
|
|
||||||
final long xceiverStopTimeout;
|
final long xceiverStopTimeout;
|
||||||
final long restartReplicaExpiry;
|
final long restartReplicaExpiry;
|
||||||
|
@ -152,6 +154,7 @@ public class DNConf {
|
||||||
this.encryptDataTransfer = conf.getBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY,
|
this.encryptDataTransfer = conf.getBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY,
|
||||||
DFS_ENCRYPT_DATA_TRANSFER_DEFAULT);
|
DFS_ENCRYPT_DATA_TRANSFER_DEFAULT);
|
||||||
this.encryptionAlgorithm = conf.get(DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
|
this.encryptionAlgorithm = conf.get(DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
|
||||||
|
this.trustedChannelResolver = TrustedChannelResolver.getInstance(conf);
|
||||||
|
|
||||||
this.xceiverStopTimeout = conf.getLong(
|
this.xceiverStopTimeout = conf.getLong(
|
||||||
DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY,
|
DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY,
|
||||||
|
|
|
@ -1616,7 +1616,8 @@ public class DataNode extends Configured
|
||||||
HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1);
|
HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1);
|
||||||
OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
|
OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
|
||||||
InputStream unbufIn = NetUtils.getInputStream(sock);
|
InputStream unbufIn = NetUtils.getInputStream(sock);
|
||||||
if (dnConf.encryptDataTransfer) {
|
if (dnConf.encryptDataTransfer &&
|
||||||
|
!dnConf.trustedChannelResolver.isTrusted(sock.getInetAddress())) {
|
||||||
IOStreamPair encryptedStreams =
|
IOStreamPair encryptedStreams =
|
||||||
DataTransferEncryptor.getEncryptedStreams(
|
DataTransferEncryptor.getEncryptedStreams(
|
||||||
unbufOut, unbufIn,
|
unbufOut, unbufIn,
|
||||||
|
|
|
@ -36,9 +36,11 @@ import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.InterruptedIOException;
|
import java.io.InterruptedIOException;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
|
import java.net.InetAddress;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.Socket;
|
import java.net.Socket;
|
||||||
import java.net.SocketException;
|
import java.net.SocketException;
|
||||||
|
import java.net.UnknownHostException;
|
||||||
import java.nio.channels.ClosedChannelException;
|
import java.nio.channels.ClosedChannelException;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
|
||||||
|
@ -81,6 +83,7 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.util.DataChecksum;
|
import org.apache.hadoop.util.DataChecksum;
|
||||||
|
|
||||||
|
import com.google.common.net.InetAddresses;
|
||||||
import com.google.protobuf.ByteString;
|
import com.google.protobuf.ByteString;
|
||||||
|
|
||||||
|
|
||||||
|
@ -169,7 +172,8 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
dataXceiverServer.addPeer(peer, Thread.currentThread());
|
dataXceiverServer.addPeer(peer, Thread.currentThread());
|
||||||
peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout);
|
peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout);
|
||||||
InputStream input = socketIn;
|
InputStream input = socketIn;
|
||||||
if ((!peer.hasSecureChannel()) && dnConf.encryptDataTransfer) {
|
if ((!peer.hasSecureChannel()) && dnConf.encryptDataTransfer &&
|
||||||
|
!dnConf.trustedChannelResolver.isTrusted(getClientAddress(peer))){
|
||||||
IOStreamPair encryptedStreams = null;
|
IOStreamPair encryptedStreams = null;
|
||||||
try {
|
try {
|
||||||
encryptedStreams = DataTransferEncryptor.getEncryptedStreams(socketOut,
|
encryptedStreams = DataTransferEncryptor.getEncryptedStreams(socketOut,
|
||||||
|
@ -258,6 +262,19 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns InetAddress from peer
|
||||||
|
* The getRemoteAddressString is the form /ip-address:port
|
||||||
|
* The ip-address is extracted from peer and InetAddress is formed
|
||||||
|
* @param peer
|
||||||
|
* @return
|
||||||
|
* @throws UnknownHostException
|
||||||
|
*/
|
||||||
|
private static InetAddress getClientAddress(Peer peer) {
|
||||||
|
return InetAddresses.forString(
|
||||||
|
peer.getRemoteAddressString().split(":")[0].substring(1));
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void requestShortCircuitFds(final ExtendedBlock blk,
|
public void requestShortCircuitFds(final ExtendedBlock blk,
|
||||||
final Token<BlockTokenIdentifier> token,
|
final Token<BlockTokenIdentifier> token,
|
||||||
|
@ -637,7 +654,8 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
OutputStream unbufMirrorOut = NetUtils.getOutputStream(mirrorSock,
|
OutputStream unbufMirrorOut = NetUtils.getOutputStream(mirrorSock,
|
||||||
writeTimeout);
|
writeTimeout);
|
||||||
InputStream unbufMirrorIn = NetUtils.getInputStream(mirrorSock);
|
InputStream unbufMirrorIn = NetUtils.getInputStream(mirrorSock);
|
||||||
if (dnConf.encryptDataTransfer) {
|
if (dnConf.encryptDataTransfer &&
|
||||||
|
!dnConf.trustedChannelResolver.isTrusted(mirrorSock.getInetAddress())) {
|
||||||
IOStreamPair encryptedStreams =
|
IOStreamPair encryptedStreams =
|
||||||
DataTransferEncryptor.getEncryptedStreams(
|
DataTransferEncryptor.getEncryptedStreams(
|
||||||
unbufMirrorOut, unbufMirrorIn,
|
unbufMirrorOut, unbufMirrorIn,
|
||||||
|
@ -963,7 +981,9 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
OutputStream unbufProxyOut = NetUtils.getOutputStream(proxySock,
|
OutputStream unbufProxyOut = NetUtils.getOutputStream(proxySock,
|
||||||
dnConf.socketWriteTimeout);
|
dnConf.socketWriteTimeout);
|
||||||
InputStream unbufProxyIn = NetUtils.getInputStream(proxySock);
|
InputStream unbufProxyIn = NetUtils.getInputStream(proxySock);
|
||||||
if (dnConf.encryptDataTransfer) {
|
if (dnConf.encryptDataTransfer &&
|
||||||
|
!dnConf.trustedChannelResolver.isTrusted(
|
||||||
|
proxySock.getInetAddress())) {
|
||||||
IOStreamPair encryptedStreams =
|
IOStreamPair encryptedStreams =
|
||||||
DataTransferEncryptor.getEncryptedStreams(
|
DataTransferEncryptor.getEncryptedStreams(
|
||||||
unbufProxyOut, unbufProxyIn,
|
unbufProxyOut, unbufProxyIn,
|
||||||
|
|
|
@ -1338,7 +1338,8 @@
|
||||||
<description>
|
<description>
|
||||||
Whether or not actual block data that is read/written from/to HDFS should
|
Whether or not actual block data that is read/written from/to HDFS should
|
||||||
be encrypted on the wire. This only needs to be set on the NN and DNs,
|
be encrypted on the wire. This only needs to be set on the NN and DNs,
|
||||||
clients will deduce this automatically.
|
clients will deduce this automatically. It is possible to override this setting
|
||||||
|
per connection by specifying custom logic via dfs.trustedchannel.resolver.class.
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
@ -1353,6 +1354,20 @@
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.trustedchannel.resolver.class</name>
|
||||||
|
<value></value>
|
||||||
|
<description>
|
||||||
|
TrustedChannelResolver is used to determine whether a channel
|
||||||
|
is trusted for plain data transfer. The TrustedChannelResolver is
|
||||||
|
invoked on both client and server side. If the resolver indicates
|
||||||
|
that the channel is trusted, then the data transfer will not be
|
||||||
|
encrypted even if dfs.encrypt.data.transfer is set to true. The
|
||||||
|
default implementation returns false indicating that the channel
|
||||||
|
is not trusted.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>dfs.datanode.hdfs-blocks-metadata.enabled</name>
|
<name>dfs.datanode.hdfs-blocks-metadata.enabled</name>
|
||||||
<value>false</value>
|
<value>false</value>
|
||||||
|
|
|
@ -23,6 +23,9 @@ import static org.junit.Assert.fail;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
|
import java.net.InetAddress;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
@ -33,24 +36,40 @@ import org.apache.hadoop.fs.FileChecksum;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
|
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
|
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
import org.junit.runners.Parameterized.Parameters;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
|
@RunWith(Parameterized.class)
|
||||||
public class TestEncryptedTransfer {
|
public class TestEncryptedTransfer {
|
||||||
|
|
||||||
|
@Parameters
|
||||||
|
public static Collection<Object[]> data() {
|
||||||
|
Collection<Object[]> params = new ArrayList<Object[]>();
|
||||||
|
params.add(new Object[]{null});
|
||||||
|
params.add(new Object[]{"org.apache.hadoop.hdfs.TestEncryptedTransfer$TestTrustedChannelResolver"});
|
||||||
|
return params;
|
||||||
|
}
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(TestEncryptedTransfer.class);
|
private static final Log LOG = LogFactory.getLog(TestEncryptedTransfer.class);
|
||||||
|
|
||||||
private static final String PLAIN_TEXT = "this is very secret plain text";
|
private static final String PLAIN_TEXT = "this is very secret plain text";
|
||||||
private static final Path TEST_PATH = new Path("/non-encrypted-file");
|
private static final Path TEST_PATH = new Path("/non-encrypted-file");
|
||||||
|
|
||||||
private static void setEncryptionConfigKeys(Configuration conf) {
|
private void setEncryptionConfigKeys(Configuration conf) {
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY, true);
|
conf.setBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY, true);
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
|
conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
|
||||||
|
if (resolverClazz != null){
|
||||||
|
conf.set(DFSConfigKeys.DFS_TRUSTEDCHANNEL_RESOLVER_CLASS, resolverClazz);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Unset DFS_ENCRYPT_DATA_TRANSFER_KEY and DFS_DATA_ENCRYPTION_ALGORITHM_KEY
|
// Unset DFS_ENCRYPT_DATA_TRANSFER_KEY and DFS_DATA_ENCRYPTION_ALGORITHM_KEY
|
||||||
|
@ -63,6 +82,11 @@ public class TestEncryptedTransfer {
|
||||||
return FileSystem.get(localConf);
|
return FileSystem.get(localConf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
String resolverClazz;
|
||||||
|
public TestEncryptedTransfer(String resolverClazz){
|
||||||
|
this.resolverClazz = resolverClazz;
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testEncryptedRead() throws IOException {
|
public void testEncryptedRead() throws IOException {
|
||||||
MiniDFSCluster cluster = null;
|
MiniDFSCluster cluster = null;
|
||||||
|
@ -206,7 +230,9 @@ public class TestEncryptedTransfer {
|
||||||
LogFactory.getLog(DataNode.class));
|
LogFactory.getLog(DataNode.class));
|
||||||
try {
|
try {
|
||||||
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
|
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
|
||||||
|
if (resolverClazz != null && !resolverClazz.endsWith("TestTrustedChannelResolver")){
|
||||||
fail("Should not have been able to read without encryption enabled.");
|
fail("Should not have been able to read without encryption enabled.");
|
||||||
|
}
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
GenericTestUtils.assertExceptionContains("Could not obtain block:",
|
GenericTestUtils.assertExceptionContains("Could not obtain block:",
|
||||||
ioe);
|
ioe);
|
||||||
|
@ -215,8 +241,10 @@ public class TestEncryptedTransfer {
|
||||||
}
|
}
|
||||||
fs.close();
|
fs.close();
|
||||||
|
|
||||||
|
if (resolverClazz != null && !resolverClazz.endsWith("TestTrustedChannelResolver")){
|
||||||
GenericTestUtils.assertMatches(logs.getOutput(),
|
GenericTestUtils.assertMatches(logs.getOutput(),
|
||||||
"Failed to read expected encryption handshake from client at");
|
"Failed to read expected encryption handshake from client at");
|
||||||
|
}
|
||||||
} finally {
|
} finally {
|
||||||
if (cluster != null) {
|
if (cluster != null) {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
|
@ -456,4 +484,16 @@ public class TestEncryptedTransfer {
|
||||||
out.write(PLAIN_TEXT.getBytes());
|
out.write(PLAIN_TEXT.getBytes());
|
||||||
out.close();
|
out.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static class TestTrustedChannelResolver extends TrustedChannelResolver {
|
||||||
|
|
||||||
|
public boolean isTrusted(){
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isTrusted(InetAddress peerAddress){
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue