HDFS-2856. Merging change r1610474 from trunk to branch-2.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1610479 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Chris Nauroth 2014-07-14 18:28:02 +00:00
parent 6da4b7b4e6
commit ea8da4b8c0
32 changed files with 2041 additions and 739 deletions

View File

@ -34,6 +34,9 @@ Release 2.6.0 - UNRELEASED
HDFS-3851. DFSOutputStream class code cleanup. (Jing Zhao via suresh) HDFS-3851. DFSOutputStream class code cleanup. (Jing Zhao via suresh)
HDFS-2856. Fix block protocol so that Datanodes don't require root or jsvc.
(cnauroth)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -145,6 +145,11 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<artifactId>junit</artifactId> <artifactId>junit</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minikdc</artifactId>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>org.mockito</groupId> <groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId> <artifactId>mockito-all</artifactId>

View File

@ -744,7 +744,8 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
} }
} }
try { try {
Peer peer = remotePeerFactory.newConnectedPeer(inetSocketAddress); Peer peer = remotePeerFactory.newConnectedPeer(inetSocketAddress, token,
datanode);
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("nextTcpPeer: created newConnectedPeer " + peer); LOG.trace("nextTcpPeer: created newConnectedPeer " + peer);
} }

View File

@ -17,6 +17,8 @@
*/ */
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
@ -93,7 +95,6 @@ import com.google.common.collect.Lists;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.BlockStorageLocation; import org.apache.hadoop.fs.BlockStorageLocation;
@ -136,6 +137,7 @@ import org.apache.hadoop.hdfs.protocol.CachePoolIterator;
import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@ -153,16 +155,19 @@ import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.Op; import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure; import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure;
import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
@ -209,7 +214,8 @@ import com.google.common.net.InetAddresses;
* *
********************************************************/ ********************************************************/
@InterfaceAudience.Private @InterfaceAudience.Private
public class DFSClient implements java.io.Closeable, RemotePeerFactory { public class DFSClient implements java.io.Closeable, RemotePeerFactory,
DataEncryptionKeyFactory {
public static final Log LOG = LogFactory.getLog(DFSClient.class); public static final Log LOG = LogFactory.getLog(DFSClient.class);
public static final long SERVER_DEFAULTS_VALIDITY_PERIOD = 60 * 60 * 1000L; // 1 hour public static final long SERVER_DEFAULTS_VALIDITY_PERIOD = 60 * 60 * 1000L; // 1 hour
static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
@ -233,7 +239,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
private final Random r = new Random(); private final Random r = new Random();
private SocketAddress[] localInterfaceAddrs; private SocketAddress[] localInterfaceAddrs;
private DataEncryptionKey encryptionKey; private DataEncryptionKey encryptionKey;
final TrustedChannelResolver trustedChannelResolver; final SaslDataTransferClient saslClient;
private final CachingStrategy defaultReadCachingStrategy; private final CachingStrategy defaultReadCachingStrategy;
private final CachingStrategy defaultWriteCachingStrategy; private final CachingStrategy defaultWriteCachingStrategy;
private final ClientContext clientContext; private final ClientContext clientContext;
@ -635,7 +641,12 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
if (numThreads > 0) { if (numThreads > 0) {
this.initThreadsNumForHedgedReads(numThreads); this.initThreadsNumForHedgedReads(numThreads);
} }
this.trustedChannelResolver = TrustedChannelResolver.getInstance(getConfiguration()); this.saslClient = new SaslDataTransferClient(
DataTransferSaslUtil.getSaslPropertiesResolver(conf),
TrustedChannelResolver.getInstance(conf),
conf.getBoolean(
IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT));
} }
/** /**
@ -1806,23 +1817,6 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
} }
} }
/**
* Get the checksum of the whole file of a range of the file. Note that the
* range always starts from the beginning of the file.
* @param src The file path
* @param length The length of the range
* @return The checksum
* @see DistributedFileSystem#getFileChecksum(Path)
*/
public MD5MD5CRC32FileChecksum getFileChecksum(String src, long length)
throws IOException {
checkOpen();
Preconditions.checkArgument(length >= 0);
return getFileChecksum(src, length, clientName, namenode,
socketFactory, dfsClientConf.socketTimeout, getDataEncryptionKey(),
dfsClientConf.connectToDnViaHostname);
}
@InterfaceAudience.Private @InterfaceAudience.Private
public void clearDataEncryptionKey() { public void clearDataEncryptionKey() {
LOG.debug("Clearing encryption key"); LOG.debug("Clearing encryption key");
@ -1841,11 +1835,9 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
return d == null ? false : d.getEncryptDataTransfer(); return d == null ? false : d.getEncryptDataTransfer();
} }
@InterfaceAudience.Private @Override
public DataEncryptionKey getDataEncryptionKey() public DataEncryptionKey newDataEncryptionKey() throws IOException {
throws IOException { if (shouldEncryptData()) {
if (shouldEncryptData() &&
!this.trustedChannelResolver.isTrusted()) {
synchronized (this) { synchronized (this) {
if (encryptionKey == null || if (encryptionKey == null ||
encryptionKey.expiryDate < Time.now()) { encryptionKey.expiryDate < Time.now()) {
@ -1860,22 +1852,17 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
} }
/** /**
* Get the checksum of the whole file or a range of the file. * Get the checksum of the whole file of a range of the file. Note that the
* range always starts from the beginning of the file.
* @param src The file path * @param src The file path
* @param length the length of the range, i.e., the range is [0, length] * @param length the length of the range, i.e., the range is [0, length]
* @param clientName the name of the client requesting the checksum.
* @param namenode the RPC proxy for the namenode
* @param socketFactory to create sockets to connect to DNs
* @param socketTimeout timeout to use when connecting and waiting for a response
* @param encryptionKey the key needed to communicate with DNs in this cluster
* @param connectToDnViaHostname whether the client should use hostnames instead of IPs
* @return The checksum * @return The checksum
* @see DistributedFileSystem#getFileChecksum(Path)
*/ */
private static MD5MD5CRC32FileChecksum getFileChecksum(String src, public MD5MD5CRC32FileChecksum getFileChecksum(String src, long length)
long length, String clientName, ClientProtocol namenode,
SocketFactory socketFactory, int socketTimeout,
DataEncryptionKey encryptionKey, boolean connectToDnViaHostname)
throws IOException { throws IOException {
checkOpen();
Preconditions.checkArgument(length >= 0);
//get block locations for the file range //get block locations for the file range
LocatedBlocks blockLocations = callGetBlockLocations(namenode, src, 0, LocatedBlocks blockLocations = callGetBlockLocations(namenode, src, 0,
length); length);
@ -1910,7 +1897,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
final DatanodeInfo[] datanodes = lb.getLocations(); final DatanodeInfo[] datanodes = lb.getLocations();
//try each datanode location of the block //try each datanode location of the block
final int timeout = 3000 * datanodes.length + socketTimeout; final int timeout = 3000 * datanodes.length + dfsClientConf.socketTimeout;
boolean done = false; boolean done = false;
for(int j = 0; !done && j < datanodes.length; j++) { for(int j = 0; !done && j < datanodes.length; j++) {
DataOutputStream out = null; DataOutputStream out = null;
@ -1918,8 +1905,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
try { try {
//connect to a datanode //connect to a datanode
IOStreamPair pair = connectToDN(socketFactory, connectToDnViaHostname, IOStreamPair pair = connectToDN(datanodes[j], timeout, lb);
encryptionKey, datanodes[j], timeout);
out = new DataOutputStream(new BufferedOutputStream(pair.out, out = new DataOutputStream(new BufferedOutputStream(pair.out,
HdfsConstants.SMALL_BUFFER_SIZE)); HdfsConstants.SMALL_BUFFER_SIZE));
in = new DataInputStream(pair.in); in = new DataInputStream(pair.in);
@ -1975,9 +1961,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
} else { } else {
LOG.debug("Retrieving checksum from an earlier-version DataNode: " + LOG.debug("Retrieving checksum from an earlier-version DataNode: " +
"inferring checksum by reading first byte"); "inferring checksum by reading first byte");
ct = inferChecksumTypeByReading( ct = inferChecksumTypeByReading(lb, datanodes[j]);
clientName, socketFactory, socketTimeout, lb, datanodes[j],
encryptionKey, connectToDnViaHostname);
} }
if (i == 0) { // first block if (i == 0) { // first block
@ -2051,16 +2035,13 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
* Connect to the given datanode's datantrasfer port, and return * Connect to the given datanode's datantrasfer port, and return
* the resulting IOStreamPair. This includes encryption wrapping, etc. * the resulting IOStreamPair. This includes encryption wrapping, etc.
*/ */
private static IOStreamPair connectToDN( private IOStreamPair connectToDN(DatanodeInfo dn, int timeout,
SocketFactory socketFactory, boolean connectToDnViaHostname, LocatedBlock lb) throws IOException {
DataEncryptionKey encryptionKey, DatanodeInfo dn, int timeout)
throws IOException
{
boolean success = false; boolean success = false;
Socket sock = null; Socket sock = null;
try { try {
sock = socketFactory.createSocket(); sock = socketFactory.createSocket();
String dnAddr = dn.getXferAddr(connectToDnViaHostname); String dnAddr = dn.getXferAddr(getConf().connectToDnViaHostname);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Connecting to datanode " + dnAddr); LOG.debug("Connecting to datanode " + dnAddr);
} }
@ -2069,13 +2050,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
OutputStream unbufOut = NetUtils.getOutputStream(sock); OutputStream unbufOut = NetUtils.getOutputStream(sock);
InputStream unbufIn = NetUtils.getInputStream(sock); InputStream unbufIn = NetUtils.getInputStream(sock);
IOStreamPair ret; IOStreamPair ret = saslClient.newSocketSend(sock, unbufOut, unbufIn, this,
if (encryptionKey != null) { lb.getBlockToken(), dn);
ret = DataTransferEncryptor.getEncryptedStreams(
unbufOut, unbufIn, encryptionKey);
} else {
ret = new IOStreamPair(unbufIn, unbufOut);
}
success = true; success = true;
return ret; return ret;
} finally { } finally {
@ -2091,21 +2067,14 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
* with older HDFS versions which did not include the checksum type in * with older HDFS versions which did not include the checksum type in
* OpBlockChecksumResponseProto. * OpBlockChecksumResponseProto.
* *
* @param in input stream from datanode
* @param out output stream to datanode
* @param lb the located block * @param lb the located block
* @param clientName the name of the DFSClient requesting the checksum
* @param dn the connected datanode * @param dn the connected datanode
* @return the inferred checksum type * @return the inferred checksum type
* @throws IOException if an error occurs * @throws IOException if an error occurs
*/ */
private static Type inferChecksumTypeByReading( private Type inferChecksumTypeByReading(LocatedBlock lb, DatanodeInfo dn)
String clientName, SocketFactory socketFactory, int socketTimeout,
LocatedBlock lb, DatanodeInfo dn,
DataEncryptionKey encryptionKey, boolean connectToDnViaHostname)
throws IOException { throws IOException {
IOStreamPair pair = connectToDN(socketFactory, connectToDnViaHostname, IOStreamPair pair = connectToDN(dn, dfsClientConf.socketTimeout, lb);
encryptionKey, dn, socketTimeout);
try { try {
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(pair.out, DataOutputStream out = new DataOutputStream(new BufferedOutputStream(pair.out,
@ -2858,7 +2827,9 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
} }
@Override // RemotePeerFactory @Override // RemotePeerFactory
public Peer newConnectedPeer(InetSocketAddress addr) throws IOException { public Peer newConnectedPeer(InetSocketAddress addr,
Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
throws IOException {
Peer peer = null; Peer peer = null;
boolean success = false; boolean success = false;
Socket sock = null; Socket sock = null;
@ -2867,8 +2838,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
NetUtils.connect(sock, addr, NetUtils.connect(sock, addr,
getRandomLocalInterfaceAddr(), getRandomLocalInterfaceAddr(),
dfsClientConf.socketTimeout); dfsClientConf.socketTimeout);
peer = TcpPeerServer.peerFromSocketAndKey(sock, peer = TcpPeerServer.peerFromSocketAndKey(saslClient, sock, this,
getDataEncryptionKey()); blockToken, datanodeId);
success = true; success = true;
return peer; return peer;
} finally { } finally {

View File

@ -565,6 +565,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final boolean DFS_ENCRYPT_DATA_TRANSFER_DEFAULT = false; public static final boolean DFS_ENCRYPT_DATA_TRANSFER_DEFAULT = false;
public static final String DFS_DATA_ENCRYPTION_ALGORITHM_KEY = "dfs.encrypt.data.transfer.algorithm"; public static final String DFS_DATA_ENCRYPTION_ALGORITHM_KEY = "dfs.encrypt.data.transfer.algorithm";
public static final String DFS_TRUSTEDCHANNEL_RESOLVER_CLASS = "dfs.trustedchannel.resolver.class"; public static final String DFS_TRUSTEDCHANNEL_RESOLVER_CLASS = "dfs.trustedchannel.resolver.class";
public static final String DFS_DATA_TRANSFER_PROTECTION_KEY = "dfs.data.transfer.protection";
public static final String DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY = "dfs.data.transfer.saslproperties.resolver.class";
// Journal-node related configs. These are read on the JN side. // Journal-node related configs. These are read on the JN side.
public static final String DFS_JOURNALNODE_EDITS_DIR_KEY = "dfs.journalnode.edits.dir"; public static final String DFS_JOURNALNODE_EDITS_DIR_KEY = "dfs.journalnode.edits.dir";

View File

@ -61,7 +61,6 @@ import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException; import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
@ -1047,14 +1046,10 @@ public class DFSOutputStream extends FSOutputSummer
OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout); OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
InputStream unbufIn = NetUtils.getInputStream(sock); InputStream unbufIn = NetUtils.getInputStream(sock);
if (dfsClient.shouldEncryptData() && IOStreamPair saslStreams = dfsClient.saslClient.socketSend(sock,
!dfsClient.trustedChannelResolver.isTrusted(sock.getInetAddress())) { unbufOut, unbufIn, dfsClient, blockToken, src);
IOStreamPair encryptedStreams = unbufOut = saslStreams.out;
DataTransferEncryptor.getEncryptedStreams( unbufIn = saslStreams.in;
unbufOut, unbufIn, dfsClient.getDataEncryptionKey());
unbufOut = encryptedStreams.out;
unbufIn = encryptedStreams.in;
}
out = new DataOutputStream(new BufferedOutputStream(unbufOut, out = new DataOutputStream(new BufferedOutputStream(unbufOut,
HdfsConstants.SMALL_BUFFER_SIZE)); HdfsConstants.SMALL_BUFFER_SIZE));
in = new DataInputStream(unbufIn); in = new DataInputStream(unbufIn);
@ -1325,14 +1320,10 @@ public class DFSOutputStream extends FSOutputSummer
OutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout); OutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout);
InputStream unbufIn = NetUtils.getInputStream(s); InputStream unbufIn = NetUtils.getInputStream(s);
if (dfsClient.shouldEncryptData() && IOStreamPair saslStreams = dfsClient.saslClient.socketSend(s,
!dfsClient.trustedChannelResolver.isTrusted(s.getInetAddress())) { unbufOut, unbufIn, dfsClient, accessToken, nodes[0]);
IOStreamPair encryptedStreams = unbufOut = saslStreams.out;
DataTransferEncryptor.getEncryptedStreams(unbufOut, unbufIn = saslStreams.in;
unbufIn, dfsClient.getDataEncryptionKey());
unbufOut = encryptedStreams.out;
unbufIn = encryptedStreams.in;
}
out = new DataOutputStream(new BufferedOutputStream(unbufOut, out = new DataOutputStream(new BufferedOutputStream(unbufOut,
HdfsConstants.SMALL_BUFFER_SIZE)); HdfsConstants.SMALL_BUFFER_SIZE));
blockReplyStream = new DataInputStream(unbufIn); blockReplyStream = new DataInputStream(unbufIn);

View File

@ -21,15 +21,21 @@ import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.security.token.Token;
public interface RemotePeerFactory { public interface RemotePeerFactory {
/** /**
* @param addr The address to connect to. * @param addr The address to connect to.
* * @param blockToken Token used during optional SASL negotiation
* @param datanodeId ID of destination DataNode
* @return A new Peer connected to the address. * @return A new Peer connected to the address.
* *
* @throws IOException If there was an error connecting or creating * @throws IOException If there was an error connecting or creating
* the remote socket, encrypted stream, etc. * the remote socket, encrypted stream, etc.
*/ */
Peer newConnectedPeer(InetSocketAddress addr) throws IOException; Peer newConnectedPeer(InetSocketAddress addr,
Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
throws IOException;
} }

View File

@ -19,9 +19,7 @@ package org.apache.hadoop.hdfs.net;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.net.unix.DomainSocket;
import java.io.InputStream; import java.io.InputStream;
@ -51,11 +49,8 @@ public class EncryptedPeer implements Peer {
*/ */
private final ReadableByteChannel channel; private final ReadableByteChannel channel;
public EncryptedPeer(Peer enclosedPeer, DataEncryptionKey key) public EncryptedPeer(Peer enclosedPeer, IOStreamPair ios) {
throws IOException {
this.enclosedPeer = enclosedPeer; this.enclosedPeer = enclosedPeer;
IOStreamPair ios = DataTransferEncryptor.getEncryptedStreams(
enclosedPeer.getOutputStream(), enclosedPeer.getInputStream(), key);
this.in = ios.in; this.in = ios.in;
this.out = ios.out; this.out = ios.out;
this.channel = ios.in instanceof ReadableByteChannel ? this.channel = ios.in instanceof ReadableByteChannel ?

View File

@ -28,10 +28,14 @@ import java.nio.channels.SocketChannel;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources; import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.token.Token;
@InterfaceAudience.Private @InterfaceAudience.Private
public class TcpPeerServer implements PeerServer { public class TcpPeerServer implements PeerServer {
@ -74,15 +78,16 @@ public class TcpPeerServer implements PeerServer {
} }
} }
public static Peer peerFromSocketAndKey(Socket s, public static Peer peerFromSocketAndKey(
DataEncryptionKey key) throws IOException { SaslDataTransferClient saslClient, Socket s,
DataEncryptionKeyFactory keyFactory,
Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
throws IOException {
Peer peer = null; Peer peer = null;
boolean success = false; boolean success = false;
try { try {
peer = peerFromSocket(s); peer = peerFromSocket(s);
if (key != null) { peer = saslClient.peerSend(peer, keyFactory, blockToken, datanodeId);
peer = new EncryptedPeer(peer, key);
}
success = true; success = true;
return peer; return peer;
} finally { } finally {

View File

@ -1,506 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocol.datatransfer;
import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Map;
import java.util.TreeMap;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.callback.NameCallback;
import javax.security.auth.callback.PasswordCallback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.sasl.AuthorizeCallback;
import javax.security.sasl.RealmCallback;
import javax.security.sasl.RealmChoiceCallback;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.security.SaslInputStream;
import org.apache.hadoop.security.SaslOutputStream;
import com.google.common.base.Charsets;
import com.google.common.collect.Maps;
import com.google.protobuf.ByteString;
/**
* A class which, given connected input/output streams, will perform a
* handshake using those streams based on SASL to produce new Input/Output
* streams which will encrypt/decrypt all data written/read from said streams.
* Much of this is inspired by or borrowed from the TSaslTransport in Apache
* Thrift, but with some HDFS-specific tweaks.
*/
@InterfaceAudience.Private
public class DataTransferEncryptor {
public static final Log LOG = LogFactory.getLog(DataTransferEncryptor.class);
/**
* Sent by clients and validated by servers. We use a number that's unlikely
* to ever be sent as the value of the DATA_TRANSFER_VERSION.
*/
private static final int ENCRYPTED_TRANSFER_MAGIC_NUMBER = 0xDEADBEEF;
/**
* Delimiter for the three-part SASL username string.
*/
private static final String NAME_DELIMITER = " ";
// This has to be set as part of the SASL spec, but it don't matter for
// our purposes, but may not be empty. It's sent over the wire, so use
// a short string.
private static final String SERVER_NAME = "0";
private static final String PROTOCOL = "hdfs";
private static final String MECHANISM = "DIGEST-MD5";
private static final Map<String, String> SASL_PROPS = new TreeMap<String, String>();
static {
SASL_PROPS.put(Sasl.QOP, "auth-conf");
SASL_PROPS.put(Sasl.SERVER_AUTH, "true");
}
/**
* Factory method for DNs, where the nonce, keyId, and encryption key are not
* yet known. The nonce and keyId will be sent by the client, and the DN
* will then use those pieces of info and the secret key shared with the NN
* to determine the encryptionKey used for the SASL handshake/encryption.
*
* Establishes a secure connection assuming that the party on the other end
* has the same shared secret. This does a SASL connection handshake, but not
* a general-purpose one. It's specific to the MD5-DIGEST SASL mechanism with
* auth-conf enabled. In particular, it doesn't support an arbitrary number of
* challenge/response rounds, and we know that the client will never have an
* initial response, so we don't check for one.
*
* @param underlyingOut output stream to write to the other party
* @param underlyingIn input stream to read from the other party
* @param blockPoolTokenSecretManager secret manager capable of constructing
* encryption key based on keyId, blockPoolId, and nonce
* @return a pair of streams which wrap the given streams and encrypt/decrypt
* all data read/written
* @throws IOException in the event of error
*/
public static IOStreamPair getEncryptedStreams(
OutputStream underlyingOut, InputStream underlyingIn,
BlockPoolTokenSecretManager blockPoolTokenSecretManager,
String encryptionAlgorithm) throws IOException {
DataInputStream in = new DataInputStream(underlyingIn);
DataOutputStream out = new DataOutputStream(underlyingOut);
Map<String, String> saslProps = Maps.newHashMap(SASL_PROPS);
saslProps.put("com.sun.security.sasl.digest.cipher", encryptionAlgorithm);
if (LOG.isDebugEnabled()) {
LOG.debug("Server using encryption algorithm " + encryptionAlgorithm);
}
SaslParticipant sasl = new SaslParticipant(Sasl.createSaslServer(MECHANISM,
PROTOCOL, SERVER_NAME, saslProps,
new SaslServerCallbackHandler(blockPoolTokenSecretManager)));
int magicNumber = in.readInt();
if (magicNumber != ENCRYPTED_TRANSFER_MAGIC_NUMBER) {
throw new InvalidMagicNumberException(magicNumber);
}
try {
// step 1
performSaslStep1(out, in, sasl);
// step 2 (server-side only)
byte[] remoteResponse = readSaslMessage(in);
byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
sendSaslMessage(out, localResponse);
// SASL handshake is complete
checkSaslComplete(sasl);
return sasl.createEncryptedStreamPair(out, in);
} catch (IOException ioe) {
if (ioe instanceof SaslException &&
ioe.getCause() != null &&
ioe.getCause() instanceof InvalidEncryptionKeyException) {
// This could just be because the client is long-lived and hasn't gotten
// a new encryption key from the NN in a while. Upon receiving this
// error, the client will get a new encryption key from the NN and retry
// connecting to this DN.
sendInvalidKeySaslErrorMessage(out, ioe.getCause().getMessage());
} else {
sendGenericSaslErrorMessage(out, ioe.getMessage());
}
throw ioe;
}
}
/**
* Factory method for clients, where the encryption token is already created.
*
* Establishes a secure connection assuming that the party on the other end
* has the same shared secret. This does a SASL connection handshake, but not
* a general-purpose one. It's specific to the MD5-DIGEST SASL mechanism with
* auth-conf enabled. In particular, it doesn't support an arbitrary number of
* challenge/response rounds, and we know that the client will never have an
* initial response, so we don't check for one.
*
* @param underlyingOut output stream to write to the other party
* @param underlyingIn input stream to read from the other party
* @param encryptionKey all info required to establish an encrypted stream
* @return a pair of streams which wrap the given streams and encrypt/decrypt
* all data read/written
* @throws IOException in the event of error
*/
public static IOStreamPair getEncryptedStreams(
OutputStream underlyingOut, InputStream underlyingIn,
DataEncryptionKey encryptionKey)
throws IOException {
Map<String, String> saslProps = Maps.newHashMap(SASL_PROPS);
saslProps.put("com.sun.security.sasl.digest.cipher",
encryptionKey.encryptionAlgorithm);
if (LOG.isDebugEnabled()) {
LOG.debug("Client using encryption algorithm " +
encryptionKey.encryptionAlgorithm);
}
DataOutputStream out = new DataOutputStream(underlyingOut);
DataInputStream in = new DataInputStream(underlyingIn);
String userName = getUserNameFromEncryptionKey(encryptionKey);
SaslParticipant sasl = new SaslParticipant(Sasl.createSaslClient(
new String[] { MECHANISM }, userName, PROTOCOL, SERVER_NAME, saslProps,
new SaslClientCallbackHandler(encryptionKey.encryptionKey, userName)));
out.writeInt(ENCRYPTED_TRANSFER_MAGIC_NUMBER);
out.flush();
try {
// Start of handshake - "initial response" in SASL terminology.
sendSaslMessage(out, new byte[0]);
// step 1
performSaslStep1(out, in, sasl);
// step 2 (client-side only)
byte[] remoteResponse = readSaslMessage(in);
byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
assert localResponse == null;
// SASL handshake is complete
checkSaslComplete(sasl);
return sasl.createEncryptedStreamPair(out, in);
} catch (IOException ioe) {
sendGenericSaslErrorMessage(out, ioe.getMessage());
throw ioe;
}
}
private static void performSaslStep1(DataOutputStream out, DataInputStream in,
SaslParticipant sasl) throws IOException {
byte[] remoteResponse = readSaslMessage(in);
byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
sendSaslMessage(out, localResponse);
}
private static void checkSaslComplete(SaslParticipant sasl) throws IOException {
if (!sasl.isComplete()) {
throw new IOException("Failed to complete SASL handshake");
}
if (!sasl.supportsConfidentiality()) {
throw new IOException("SASL handshake completed, but channel does not " +
"support encryption");
}
}
private static void sendSaslMessage(DataOutputStream out, byte[] payload)
throws IOException {
sendSaslMessage(out, DataTransferEncryptorStatus.SUCCESS, payload, null);
}
private static void sendInvalidKeySaslErrorMessage(DataOutputStream out,
String message) throws IOException {
sendSaslMessage(out, DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY, null,
message);
}
private static void sendGenericSaslErrorMessage(DataOutputStream out,
String message) throws IOException {
sendSaslMessage(out, DataTransferEncryptorStatus.ERROR, null, message);
}
private static void sendSaslMessage(OutputStream out,
DataTransferEncryptorStatus status, byte[] payload, String message)
throws IOException {
DataTransferEncryptorMessageProto.Builder builder =
DataTransferEncryptorMessageProto.newBuilder();
builder.setStatus(status);
if (payload != null) {
builder.setPayload(ByteString.copyFrom(payload));
}
if (message != null) {
builder.setMessage(message);
}
DataTransferEncryptorMessageProto proto = builder.build();
proto.writeDelimitedTo(out);
out.flush();
}
private static byte[] readSaslMessage(DataInputStream in) throws IOException {
DataTransferEncryptorMessageProto proto =
DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in));
if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) {
throw new InvalidEncryptionKeyException(proto.getMessage());
} else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) {
throw new IOException(proto.getMessage());
} else {
return proto.getPayload().toByteArray();
}
}
/**
* Set the encryption key when asked by the server-side SASL object.
*/
private static class SaslServerCallbackHandler implements CallbackHandler {
private final BlockPoolTokenSecretManager blockPoolTokenSecretManager;
public SaslServerCallbackHandler(BlockPoolTokenSecretManager
blockPoolTokenSecretManager) {
this.blockPoolTokenSecretManager = blockPoolTokenSecretManager;
}
@Override
public void handle(Callback[] callbacks) throws IOException,
UnsupportedCallbackException {
NameCallback nc = null;
PasswordCallback pc = null;
AuthorizeCallback ac = null;
for (Callback callback : callbacks) {
if (callback instanceof AuthorizeCallback) {
ac = (AuthorizeCallback) callback;
} else if (callback instanceof PasswordCallback) {
pc = (PasswordCallback) callback;
} else if (callback instanceof NameCallback) {
nc = (NameCallback) callback;
} else if (callback instanceof RealmCallback) {
continue; // realm is ignored
} else {
throw new UnsupportedCallbackException(callback,
"Unrecognized SASL DIGEST-MD5 Callback: " + callback);
}
}
if (pc != null) {
byte[] encryptionKey = getEncryptionKeyFromUserName(
blockPoolTokenSecretManager, nc.getDefaultName());
pc.setPassword(encryptionKeyToPassword(encryptionKey));
}
if (ac != null) {
ac.setAuthorized(true);
ac.setAuthorizedID(ac.getAuthorizationID());
}
}
}
/**
* Set the encryption key when asked by the client-side SASL object.
*/
private static class SaslClientCallbackHandler implements CallbackHandler {
private final byte[] encryptionKey;
private final String userName;
public SaslClientCallbackHandler(byte[] encryptionKey, String userName) {
this.encryptionKey = encryptionKey;
this.userName = userName;
}
@Override
public void handle(Callback[] callbacks) throws IOException,
UnsupportedCallbackException {
NameCallback nc = null;
PasswordCallback pc = null;
RealmCallback rc = null;
for (Callback callback : callbacks) {
if (callback instanceof RealmChoiceCallback) {
continue;
} else if (callback instanceof NameCallback) {
nc = (NameCallback) callback;
} else if (callback instanceof PasswordCallback) {
pc = (PasswordCallback) callback;
} else if (callback instanceof RealmCallback) {
rc = (RealmCallback) callback;
} else {
throw new UnsupportedCallbackException(callback,
"Unrecognized SASL client callback");
}
}
if (nc != null) {
nc.setName(userName);
}
if (pc != null) {
pc.setPassword(encryptionKeyToPassword(encryptionKey));
}
if (rc != null) {
rc.setText(rc.getDefaultText());
}
}
}
/**
* The SASL username consists of the keyId, blockPoolId, and nonce with the
* first two encoded as Strings, and the third encoded using Base64. The
* fields are each separated by a single space.
*
* @param encryptionKey the encryption key to encode as a SASL username.
* @return encoded username containing keyId, blockPoolId, and nonce
*/
private static String getUserNameFromEncryptionKey(
DataEncryptionKey encryptionKey) {
return encryptionKey.keyId + NAME_DELIMITER +
encryptionKey.blockPoolId + NAME_DELIMITER +
new String(Base64.encodeBase64(encryptionKey.nonce, false), Charsets.UTF_8);
}
/**
* Given a secret manager and a username encoded as described above, determine
* the encryption key.
*
* @param blockPoolTokenSecretManager to determine the encryption key.
* @param userName containing the keyId, blockPoolId, and nonce.
* @return secret encryption key.
* @throws IOException
*/
private static byte[] getEncryptionKeyFromUserName(
BlockPoolTokenSecretManager blockPoolTokenSecretManager, String userName)
throws IOException {
String[] nameComponents = userName.split(NAME_DELIMITER);
if (nameComponents.length != 3) {
throw new IOException("Provided name '" + userName + "' has " +
nameComponents.length + " components instead of the expected 3.");
}
int keyId = Integer.parseInt(nameComponents[0]);
String blockPoolId = nameComponents[1];
byte[] nonce = Base64.decodeBase64(nameComponents[2]);
return blockPoolTokenSecretManager.retrieveDataEncryptionKey(keyId,
blockPoolId, nonce);
}
private static char[] encryptionKeyToPassword(byte[] encryptionKey) {
return new String(Base64.encodeBase64(encryptionKey, false), Charsets.UTF_8).toCharArray();
}
/**
* Strongly inspired by Thrift's TSaslTransport class.
*
* Used to abstract over the <code>SaslServer</code> and
* <code>SaslClient</code> classes, which share a lot of their interface, but
* unfortunately don't share a common superclass.
*/
private static class SaslParticipant {
// One of these will always be null.
public SaslServer saslServer;
public SaslClient saslClient;
public SaslParticipant(SaslServer saslServer) {
this.saslServer = saslServer;
}
public SaslParticipant(SaslClient saslClient) {
this.saslClient = saslClient;
}
public byte[] evaluateChallengeOrResponse(byte[] challengeOrResponse) throws SaslException {
if (saslClient != null) {
return saslClient.evaluateChallenge(challengeOrResponse);
} else {
return saslServer.evaluateResponse(challengeOrResponse);
}
}
public boolean isComplete() {
if (saslClient != null)
return saslClient.isComplete();
else
return saslServer.isComplete();
}
public boolean supportsConfidentiality() {
String qop = null;
if (saslClient != null) {
qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP);
} else {
qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
}
return qop != null && qop.equals("auth-conf");
}
// Return some input/output streams that will henceforth have their
// communication encrypted.
private IOStreamPair createEncryptedStreamPair(
DataOutputStream out, DataInputStream in) {
if (saslClient != null) {
return new IOStreamPair(
new SaslInputStream(in, saslClient),
new SaslOutputStream(out, saslClient));
} else {
return new IOStreamPair(
new SaslInputStream(in, saslServer),
new SaslOutputStream(out, saslServer));
}
}
}
@InterfaceAudience.Private
public static class InvalidMagicNumberException extends IOException {
private static final long serialVersionUID = 1L;
public InvalidMagicNumberException(int magicNumber) {
super(String.format("Received %x instead of %x from client.",
magicNumber, ENCRYPTED_TRANSFER_MAGIC_NUMBER));
}
}
}

View File

@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
/**
* Creates a new {@link DataEncryptionKey} on demand.
*/
@InterfaceAudience.Private
public interface DataEncryptionKeyFactory {
/**
* Creates a new DataEncryptionKey.
*
* @return DataEncryptionKey newly created
* @throws IOException for any error
*/
DataEncryptionKey newDataEncryptionKey() throws IOException;
}

View File

@ -0,0 +1,267 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY;
import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.util.Arrays;
import java.util.Map;
import java.util.Set;
import javax.security.sasl.Sasl;
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
import org.apache.hadoop.security.SaslPropertiesResolver;
import org.apache.hadoop.security.SaslRpcServer.QualityOfProtection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.net.InetAddresses;
import com.google.protobuf.ByteString;
/**
* Utility methods implementing SASL negotiation for DataTransferProtocol.
*/
@InterfaceAudience.Private
public final class DataTransferSaslUtil {
private static final Logger LOG = LoggerFactory.getLogger(
DataTransferSaslUtil.class);
/**
* Delimiter for the three-part SASL username string.
*/
public static final String NAME_DELIMITER = " ";
/**
* Sent by clients and validated by servers. We use a number that's unlikely
* to ever be sent as the value of the DATA_TRANSFER_VERSION.
*/
public static final int SASL_TRANSFER_MAGIC_NUMBER = 0xDEADBEEF;
/**
* Checks that SASL negotiation has completed for the given participant, and
* the negotiated quality of protection is included in the given SASL
* properties and therefore acceptable.
*
* @param sasl participant to check
* @param saslProps properties of SASL negotiation
* @throws IOException for any error
*/
public static void checkSaslComplete(SaslParticipant sasl,
Map<String, String> saslProps) throws IOException {
if (!sasl.isComplete()) {
throw new IOException("Failed to complete SASL handshake");
}
Set<String> requestedQop = ImmutableSet.copyOf(Arrays.asList(
saslProps.get(Sasl.QOP).split(",")));
String negotiatedQop = sasl.getNegotiatedQop();
LOG.debug("Verifying QOP, requested QOP = {}, negotiated QOP = {}",
requestedQop, negotiatedQop);
if (!requestedQop.contains(negotiatedQop)) {
throw new IOException(String.format("SASL handshake completed, but " +
"channel does not have acceptable quality of protection, " +
"requested = %s, negotiated = %s", requestedQop, negotiatedQop));
}
}
/**
* Creates SASL properties required for an encrypted SASL negotiation.
*
* @param encryptionAlgorithm to use for SASL negotation
* @return properties of encrypted SASL negotiation
*/
public static Map<String, String> createSaslPropertiesForEncryption(
String encryptionAlgorithm) {
Map<String, String> saslProps = Maps.newHashMapWithExpectedSize(3);
saslProps.put(Sasl.QOP, QualityOfProtection.PRIVACY.getSaslQop());
saslProps.put(Sasl.SERVER_AUTH, "true");
saslProps.put("com.sun.security.sasl.digest.cipher", encryptionAlgorithm);
return saslProps;
}
/**
* For an encrypted SASL negotiation, encodes an encryption key to a SASL
* password.
*
* @param encryptionKey to encode
* @return key encoded as SASL password
*/
public static char[] encryptionKeyToPassword(byte[] encryptionKey) {
return new String(Base64.encodeBase64(encryptionKey, false), Charsets.UTF_8)
.toCharArray();
}
/**
* Returns InetAddress from peer. The getRemoteAddressString has the form
* [host][/ip-address]:port. The host may be missing. The IP address (and
* preceding '/') may be missing. The port preceded by ':' is always present.
*
* @param peer
* @return InetAddress from peer
*/
public static InetAddress getPeerAddress(Peer peer) {
String remoteAddr = peer.getRemoteAddressString().split(":")[0];
int slashIdx = remoteAddr.indexOf('/');
return InetAddresses.forString(slashIdx != -1 ?
remoteAddr.substring(slashIdx + 1, remoteAddr.length()) :
remoteAddr);
}
/**
* Creates a SaslPropertiesResolver from the given configuration. This method
* works by cloning the configuration, translating configuration properties
* specific to DataTransferProtocol to what SaslPropertiesResolver expects,
* and then delegating to SaslPropertiesResolver for initialization. This
* method returns null if SASL protection has not been configured for
* DataTransferProtocol.
*
* @param conf configuration to read
* @return SaslPropertiesResolver for DataTransferProtocol, or null if not
* configured
*/
public static SaslPropertiesResolver getSaslPropertiesResolver(
Configuration conf) {
String qops = conf.get(DFS_DATA_TRANSFER_PROTECTION_KEY);
if (qops == null || qops.isEmpty()) {
LOG.debug("DataTransferProtocol not using SaslPropertiesResolver, no " +
"QOP found in configuration for {}", DFS_DATA_TRANSFER_PROTECTION_KEY);
return null;
}
Configuration saslPropsResolverConf = new Configuration(conf);
saslPropsResolverConf.set(HADOOP_RPC_PROTECTION, qops);
Class<? extends SaslPropertiesResolver> resolverClass = conf.getClass(
DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY,
SaslPropertiesResolver.class, SaslPropertiesResolver.class);
saslPropsResolverConf.setClass(HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS,
resolverClass, SaslPropertiesResolver.class);
SaslPropertiesResolver resolver = SaslPropertiesResolver.getInstance(
saslPropsResolverConf);
LOG.debug("DataTransferProtocol using SaslPropertiesResolver, configured " +
"QOP {} = {}, configured class {} = {}", DFS_DATA_TRANSFER_PROTECTION_KEY, qops,
DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY, resolverClass);
return resolver;
}
/**
* Performs the first step of SASL negotiation.
*
* @param out connection output stream
* @param in connection input stream
* @param sasl participant
*/
public static void performSaslStep1(OutputStream out, InputStream in,
SaslParticipant sasl) throws IOException {
byte[] remoteResponse = readSaslMessage(in);
byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
sendSaslMessage(out, localResponse);
}
/**
* Reads a SASL negotiation message.
*
* @param in stream to read
* @return bytes of SASL negotiation messsage
* @throws IOException for any error
*/
public static byte[] readSaslMessage(InputStream in) throws IOException {
DataTransferEncryptorMessageProto proto =
DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in));
if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) {
throw new InvalidEncryptionKeyException(proto.getMessage());
} else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) {
throw new IOException(proto.getMessage());
} else {
return proto.getPayload().toByteArray();
}
}
/**
* Sends a SASL negotiation message indicating an error.
*
* @param out stream to receive message
* @param message to send
* @throws IOException for any error
*/
public static void sendGenericSaslErrorMessage(OutputStream out,
String message) throws IOException {
sendSaslMessage(out, DataTransferEncryptorStatus.ERROR, null, message);
}
/**
* Sends a SASL negotiation message.
*
* @param out stream to receive message
* @param payload to send
* @throws IOException for any error
*/
public static void sendSaslMessage(OutputStream out, byte[] payload)
throws IOException {
sendSaslMessage(out, DataTransferEncryptorStatus.SUCCESS, payload, null);
}
/**
* Sends a SASL negotiation message.
*
* @param out stream to receive message
* @param status negotiation status
* @param payload to send
* @param message to send
* @throws IOException for any error
*/
public static void sendSaslMessage(OutputStream out,
DataTransferEncryptorStatus status, byte[] payload, String message)
throws IOException {
DataTransferEncryptorMessageProto.Builder builder =
DataTransferEncryptorMessageProto.newBuilder();
builder.setStatus(status);
if (payload != null) {
builder.setPayload(ByteString.copyFrom(payload));
}
if (message != null) {
builder.setMessage(message);
}
DataTransferEncryptorMessageProto proto = builder.build();
proto.writeDelimitedTo(out);
out.flush();
}
/**
* There is no reason to instantiate this class.
*/
private DataTransferSaslUtil() {
}
}

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
import static org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil.SASL_TRANSFER_MAGIC_NUMBER;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* Indicates that SASL protocol negotiation expected to read a pre-defined magic
* number, but the expected value was not seen.
*/
@InterfaceAudience.Private
public class InvalidMagicNumberException extends IOException {
private static final long serialVersionUID = 1L;
/**
* Creates a new InvalidMagicNumberException.
*
* @param magicNumber expected value
*/
public InvalidMagicNumberException(int magicNumber) {
super(String.format("Received %x instead of %x from client.",
magicNumber, SASL_TRANSFER_MAGIC_NUMBER));
}
}

View File

@ -0,0 +1,439 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
import static org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil.*;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.Socket;
import java.util.Map;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.callback.NameCallback;
import javax.security.auth.callback.PasswordCallback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.sasl.RealmCallback;
import javax.security.sasl.RealmChoiceCallback;
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.net.EncryptedPeer;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.security.SaslPropertiesResolver;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Charsets;
/**
* Negotiates SASL for DataTransferProtocol on behalf of a client. There are
* two possible supported variants of SASL negotiation: either a general-purpose
* negotiation supporting any quality of protection, or a specialized
* negotiation that enforces privacy as the quality of protection using a
* cryptographically strong encryption key.
*
* This class is used in both the HDFS client and the DataNode. The DataNode
* needs it, because it acts as a client to other DataNodes during write
* pipelines and block transfers.
*/
@InterfaceAudience.Private
public class SaslDataTransferClient {
private static final Logger LOG = LoggerFactory.getLogger(
SaslDataTransferClient.class);
private final boolean fallbackToSimpleAuthAllowed;
private final SaslPropertiesResolver saslPropsResolver;
private final TrustedChannelResolver trustedChannelResolver;
/**
* Creates a new SaslDataTransferClient.
*
* @param saslPropsResolver for determining properties of SASL negotiation
* @param trustedChannelResolver for identifying trusted connections that do
* not require SASL negotiation
*/
public SaslDataTransferClient(SaslPropertiesResolver saslPropsResolver,
TrustedChannelResolver trustedChannelResolver,
boolean fallbackToSimpleAuthAllowed) {
this.fallbackToSimpleAuthAllowed = fallbackToSimpleAuthAllowed;
this.saslPropsResolver = saslPropsResolver;
this.trustedChannelResolver = trustedChannelResolver;
}
/**
* Sends client SASL negotiation for a newly allocated socket if required.
*
* @param socket connection socket
* @param underlyingOut connection output stream
* @param underlyingIn connection input stream
* @param encryptionKeyFactory for creation of an encryption key
* @param accessToken connection block access token
* @param datanodeId ID of destination DataNode
* @return new pair of streams, wrapped after SASL negotiation
* @throws IOException for any error
*/
public IOStreamPair newSocketSend(Socket socket, OutputStream underlyingOut,
InputStream underlyingIn, DataEncryptionKeyFactory encryptionKeyFactory,
Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId)
throws IOException {
// The encryption key factory only returns a key if encryption is enabled.
DataEncryptionKey encryptionKey = !trustedChannelResolver.isTrusted() ?
encryptionKeyFactory.newDataEncryptionKey() : null;
IOStreamPair ios = send(socket.getInetAddress(), underlyingOut,
underlyingIn, encryptionKey, accessToken, datanodeId);
return ios != null ? ios : new IOStreamPair(underlyingIn, underlyingOut);
}
/**
* Sends client SASL negotiation for a peer if required.
*
* @param peer connection peer
* @param encryptionKeyFactory for creation of an encryption key
* @param accessToken connection block access token
* @param datanodeId ID of destination DataNode
* @return new pair of streams, wrapped after SASL negotiation
* @throws IOException for any error
*/
public Peer peerSend(Peer peer, DataEncryptionKeyFactory encryptionKeyFactory,
Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId)
throws IOException {
IOStreamPair ios = checkTrustAndSend(getPeerAddress(peer),
peer.getOutputStream(), peer.getInputStream(), encryptionKeyFactory,
accessToken, datanodeId);
// TODO: Consider renaming EncryptedPeer to SaslPeer.
return ios != null ? new EncryptedPeer(peer, ios) : peer;
}
/**
* Sends client SASL negotiation for a socket if required.
*
* @param socket connection socket
* @param underlyingOut connection output stream
* @param underlyingIn connection input stream
* @param encryptionKeyFactory for creation of an encryption key
* @param accessToken connection block access token
* @param datanodeId ID of destination DataNode
* @return new pair of streams, wrapped after SASL negotiation
* @throws IOException for any error
*/
public IOStreamPair socketSend(Socket socket, OutputStream underlyingOut,
InputStream underlyingIn, DataEncryptionKeyFactory encryptionKeyFactory,
Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId)
throws IOException {
IOStreamPair ios = checkTrustAndSend(socket.getInetAddress(), underlyingOut,
underlyingIn, encryptionKeyFactory, accessToken, datanodeId);
return ios != null ? ios : new IOStreamPair(underlyingIn, underlyingOut);
}
/**
* Checks if an address is already trusted and then sends client SASL
* negotiation if required.
*
* @param addr connection address
* @param underlyingOut connection output stream
* @param underlyingIn connection input stream
* @param encryptionKeyFactory for creation of an encryption key
* @param accessToken connection block access token
* @param datanodeId ID of destination DataNode
* @return new pair of streams, wrapped after SASL negotiation
* @throws IOException for any error
*/
private IOStreamPair checkTrustAndSend(InetAddress addr,
OutputStream underlyingOut, InputStream underlyingIn,
DataEncryptionKeyFactory encryptionKeyFactory,
Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId)
throws IOException {
if (!trustedChannelResolver.isTrusted() &&
!trustedChannelResolver.isTrusted(addr)) {
// The encryption key factory only returns a key if encryption is enabled.
DataEncryptionKey encryptionKey =
encryptionKeyFactory.newDataEncryptionKey();
return send(addr, underlyingOut, underlyingIn, encryptionKey, accessToken,
datanodeId);
} else {
LOG.debug(
"SASL client skipping handshake on trusted connection for addr = {}, "
+ "datanodeId = {}", addr, datanodeId);
return null;
}
}
/**
* Sends client SASL negotiation if required. Determines the correct type of
* SASL handshake based on configuration.
*
* @param addr connection address
* @param underlyingOut connection output stream
* @param underlyingIn connection input stream
* @param encryptionKey for an encrypted SASL handshake
* @param accessToken connection block access token
* @param datanodeId ID of destination DataNode
* @return new pair of streams, wrapped after SASL negotiation
* @throws IOException for any error
*/
private IOStreamPair send(InetAddress addr, OutputStream underlyingOut,
InputStream underlyingIn, DataEncryptionKey encryptionKey,
Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId)
throws IOException {
if (encryptionKey != null) {
LOG.debug(
"SASL client doing encrypted handshake for addr = {}, datanodeId = {}",
addr, datanodeId);
return getEncryptedStreams(underlyingOut, underlyingIn,
encryptionKey);
} else if (!UserGroupInformation.isSecurityEnabled()) {
LOG.debug(
"SASL client skipping handshake in unsecured configuration for "
+ "addr = {}, datanodeId = {}", addr, datanodeId);
return null;
} else if (datanodeId.getXferPort() < 1024) {
LOG.debug(
"SASL client skipping handshake in secured configuration with "
+ "privileged port for addr = {}, datanodeId = {}", addr, datanodeId);
return null;
} else if (accessToken.getIdentifier().length == 0) {
if (!fallbackToSimpleAuthAllowed) {
throw new IOException(
"No block access token was provided (insecure cluster), but this " +
"client is configured to allow only secure connections.");
}
LOG.debug(
"SASL client skipping handshake in secured configuration with "
+ "unsecured cluster for addr = {}, datanodeId = {}", addr, datanodeId);
return null;
} else {
LOG.debug(
"SASL client doing general handshake for addr = {}, datanodeId = {}",
addr, datanodeId);
return getSaslStreams(addr, underlyingOut, underlyingIn, accessToken,
datanodeId);
}
}
/**
* Sends client SASL negotiation for specialized encrypted handshake.
*
* @param underlyingOut connection output stream
* @param underlyingIn connection input stream
* @param encryptionKey for an encrypted SASL handshake
* @return new pair of streams, wrapped after SASL negotiation
* @throws IOException for any error
*/
private IOStreamPair getEncryptedStreams(OutputStream underlyingOut,
InputStream underlyingIn, DataEncryptionKey encryptionKey)
throws IOException {
Map<String, String> saslProps = createSaslPropertiesForEncryption(
encryptionKey.encryptionAlgorithm);
LOG.debug("Client using encryption algorithm {}",
encryptionKey.encryptionAlgorithm);
String userName = getUserNameFromEncryptionKey(encryptionKey);
char[] password = encryptionKeyToPassword(encryptionKey.encryptionKey);
CallbackHandler callbackHandler = new SaslClientCallbackHandler(userName,
password);
return doSaslHandshake(underlyingOut, underlyingIn, userName, saslProps,
callbackHandler);
}
/**
* The SASL username for an encrypted handshake consists of the keyId,
* blockPoolId, and nonce with the first two encoded as Strings, and the third
* encoded using Base64. The fields are each separated by a single space.
*
* @param encryptionKey the encryption key to encode as a SASL username.
* @return encoded username containing keyId, blockPoolId, and nonce
*/
private static String getUserNameFromEncryptionKey(
DataEncryptionKey encryptionKey) {
return encryptionKey.keyId + NAME_DELIMITER +
encryptionKey.blockPoolId + NAME_DELIMITER +
new String(Base64.encodeBase64(encryptionKey.nonce, false), Charsets.UTF_8);
}
/**
* Sets user name and password when asked by the client-side SASL object.
*/
private static final class SaslClientCallbackHandler
implements CallbackHandler {
private final char[] password;
private final String userName;
/**
* Creates a new SaslClientCallbackHandler.
*
* @param userName SASL user name
* @Param password SASL password
*/
public SaslClientCallbackHandler(String userName, char[] password) {
this.password = password;
this.userName = userName;
}
@Override
public void handle(Callback[] callbacks) throws IOException,
UnsupportedCallbackException {
NameCallback nc = null;
PasswordCallback pc = null;
RealmCallback rc = null;
for (Callback callback : callbacks) {
if (callback instanceof RealmChoiceCallback) {
continue;
} else if (callback instanceof NameCallback) {
nc = (NameCallback) callback;
} else if (callback instanceof PasswordCallback) {
pc = (PasswordCallback) callback;
} else if (callback instanceof RealmCallback) {
rc = (RealmCallback) callback;
} else {
throw new UnsupportedCallbackException(callback,
"Unrecognized SASL client callback");
}
}
if (nc != null) {
nc.setName(userName);
}
if (pc != null) {
pc.setPassword(password);
}
if (rc != null) {
rc.setText(rc.getDefaultText());
}
}
}
/**
* Sends client SASL negotiation for general-purpose handshake.
*
* @param addr connection address
* @param underlyingOut connection output stream
* @param underlyingIn connection input stream
* @param accessToken connection block access token
* @param datanodeId ID of destination DataNode
* @return new pair of streams, wrapped after SASL negotiation
* @throws IOException for any error
*/
private IOStreamPair getSaslStreams(InetAddress addr,
OutputStream underlyingOut, InputStream underlyingIn,
Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId)
throws IOException {
if (saslPropsResolver == null) {
throw new IOException(String.format("Cannot create a secured " +
"connection if DataNode listens on unprivileged port (%d) and no " +
"protection is defined in configuration property %s.",
datanodeId.getXferPort(), DFS_DATA_TRANSFER_PROTECTION_KEY));
}
Map<String, String> saslProps = saslPropsResolver.getClientProperties(addr);
String userName = buildUserName(accessToken);
char[] password = buildClientPassword(accessToken);
CallbackHandler callbackHandler = new SaslClientCallbackHandler(userName,
password);
return doSaslHandshake(underlyingOut, underlyingIn, userName, saslProps,
callbackHandler);
}
/**
* Builds the client's user name for the general-purpose handshake, consisting
* of the base64-encoded serialized block access token identifier. Note that
* this includes only the token identifier, not the token itself, which would
* include the password. The password is a shared secret, and we must not
* write it on the network during the SASL authentication exchange.
*
* @param blockToken for block access
* @return SASL user name
*/
private static String buildUserName(Token<BlockTokenIdentifier> blockToken) {
return new String(Base64.encodeBase64(blockToken.getIdentifier(), false),
Charsets.UTF_8);
}
/**
* Calculates the password on the client side for the general-purpose
* handshake. The password consists of the block access token's password.
*
* @param blockToken for block access
* @return SASL password
*/
private char[] buildClientPassword(Token<BlockTokenIdentifier> blockToken) {
return new String(Base64.encodeBase64(blockToken.getPassword(), false),
Charsets.UTF_8).toCharArray();
}
/**
* This method actually executes the client-side SASL handshake.
*
* @param underlyingOut connection output stream
* @param underlyingIn connection input stream
* @param userName SASL user name
* @param saslProps properties of SASL negotiation
* @param callbackHandler for responding to SASL callbacks
* @return new pair of streams, wrapped after SASL negotiation
* @throws IOException for any error
*/
private IOStreamPair doSaslHandshake(OutputStream underlyingOut,
InputStream underlyingIn, String userName, Map<String, String> saslProps,
CallbackHandler callbackHandler) throws IOException {
DataOutputStream out = new DataOutputStream(underlyingOut);
DataInputStream in = new DataInputStream(underlyingIn);
SaslParticipant sasl= SaslParticipant.createClientSaslParticipant(userName,
saslProps, callbackHandler);
out.writeInt(SASL_TRANSFER_MAGIC_NUMBER);
out.flush();
try {
// Start of handshake - "initial response" in SASL terminology.
sendSaslMessage(out, new byte[0]);
// step 1
performSaslStep1(out, in, sasl);
// step 2 (client-side only)
byte[] remoteResponse = readSaslMessage(in);
byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
assert localResponse == null;
// SASL handshake is complete
checkSaslComplete(sasl, saslProps);
return sasl.createStreamPair(out, in);
} catch (IOException ioe) {
sendGenericSaslErrorMessage(out, ioe.getMessage());
throw ioe;
}
}
}

View File

@ -0,0 +1,381 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
import static org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil.*;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Map;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.callback.NameCallback;
import javax.security.auth.callback.PasswordCallback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.sasl.AuthorizeCallback;
import javax.security.sasl.RealmCallback;
import javax.security.sasl.SaslException;
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.DNConf;
import org.apache.hadoop.security.SaslPropertiesResolver;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Charsets;
/**
* Negotiates SASL for DataTransferProtocol on behalf of a server. There are
* two possible supported variants of SASL negotiation: either a general-purpose
* negotiation supporting any quality of protection, or a specialized
* negotiation that enforces privacy as the quality of protection using a
* cryptographically strong encryption key.
*
* This class is used in the DataNode for handling inbound connections.
*/
@InterfaceAudience.Private
public class SaslDataTransferServer {
private static final Logger LOG = LoggerFactory.getLogger(
SaslDataTransferServer.class);
private final BlockPoolTokenSecretManager blockPoolTokenSecretManager;
private final DNConf dnConf;
/**
* Creates a new SaslDataTransferServer.
*
* @param dnConf configuration of DataNode
* @param blockPoolTokenSecretManager used for checking block access tokens
* and encryption keys
*/
public SaslDataTransferServer(DNConf dnConf,
BlockPoolTokenSecretManager blockPoolTokenSecretManager) {
this.blockPoolTokenSecretManager = blockPoolTokenSecretManager;
this.dnConf = dnConf;
}
/**
* Receives SASL negotiation from a peer on behalf of a server.
*
* @param peer connection peer
* @param underlyingOut connection output stream
* @param underlyingIn connection input stream
* @param datanodeId ID of DataNode accepting connection
* @return new pair of streams, wrapped after SASL negotiation
* @throws IOException for any error
*/
public IOStreamPair receive(Peer peer, OutputStream underlyingOut,
InputStream underlyingIn, DatanodeID datanodeId) throws IOException {
if (dnConf.getEncryptDataTransfer()) {
LOG.debug(
"SASL server doing encrypted handshake for peer = {}, datanodeId = {}",
peer, datanodeId);
return getEncryptedStreams(peer, underlyingOut, underlyingIn);
} else if (!UserGroupInformation.isSecurityEnabled()) {
LOG.debug(
"SASL server skipping handshake in unsecured configuration for "
+ "peer = {}, datanodeId = {}", peer, datanodeId);
return new IOStreamPair(underlyingIn, underlyingOut);
} else if (datanodeId.getXferPort() < 1024) {
LOG.debug(
"SASL server skipping handshake in unsecured configuration for "
+ "peer = {}, datanodeId = {}", peer, datanodeId);
return new IOStreamPair(underlyingIn, underlyingOut);
} else {
LOG.debug(
"SASL server doing general handshake for peer = {}, datanodeId = {}",
peer, datanodeId);
return getSaslStreams(peer, underlyingOut, underlyingIn, datanodeId);
}
}
/**
* Receives SASL negotiation for specialized encrypted handshake.
*
* @param peer connection peer
* @param underlyingOut connection output stream
* @param underlyingIn connection input stream
* @return new pair of streams, wrapped after SASL negotiation
* @throws IOException for any error
*/
private IOStreamPair getEncryptedStreams(Peer peer,
OutputStream underlyingOut, InputStream underlyingIn) throws IOException {
if (peer.hasSecureChannel() ||
dnConf.getTrustedChannelResolver().isTrusted(getPeerAddress(peer))) {
return new IOStreamPair(underlyingIn, underlyingOut);
}
Map<String, String> saslProps = createSaslPropertiesForEncryption(
dnConf.getEncryptionAlgorithm());
if (LOG.isDebugEnabled()) {
LOG.debug("Server using encryption algorithm " +
dnConf.getEncryptionAlgorithm());
}
CallbackHandler callbackHandler = new SaslServerCallbackHandler(
new PasswordFunction() {
@Override
public char[] apply(String userName) throws IOException {
return encryptionKeyToPassword(getEncryptionKeyFromUserName(userName));
}
});
return doSaslHandshake(underlyingOut, underlyingIn, saslProps,
callbackHandler);
}
/**
* The SASL handshake for encrypted vs. general-purpose uses different logic
* for determining the password. This interface is used to parameterize that
* logic. It's similar to a Guava Function, but we need to let it throw
* exceptions.
*/
private interface PasswordFunction {
/**
* Returns the SASL password for the given user name.
*
* @param userName SASL user name
* @return SASL password
* @throws IOException for any error
*/
char[] apply(String userName) throws IOException;
}
/**
* Sets user name and password when asked by the server-side SASL object.
*/
private static final class SaslServerCallbackHandler
implements CallbackHandler {
private final PasswordFunction passwordFunction;
/**
* Creates a new SaslServerCallbackHandler.
*
* @param passwordFunction for determing the user's password
*/
public SaslServerCallbackHandler(PasswordFunction passwordFunction) {
this.passwordFunction = passwordFunction;
}
@Override
public void handle(Callback[] callbacks) throws IOException,
UnsupportedCallbackException {
NameCallback nc = null;
PasswordCallback pc = null;
AuthorizeCallback ac = null;
for (Callback callback : callbacks) {
if (callback instanceof AuthorizeCallback) {
ac = (AuthorizeCallback) callback;
} else if (callback instanceof PasswordCallback) {
pc = (PasswordCallback) callback;
} else if (callback instanceof NameCallback) {
nc = (NameCallback) callback;
} else if (callback instanceof RealmCallback) {
continue; // realm is ignored
} else {
throw new UnsupportedCallbackException(callback,
"Unrecognized SASL DIGEST-MD5 Callback: " + callback);
}
}
if (pc != null) {
pc.setPassword(passwordFunction.apply(nc.getDefaultName()));
}
if (ac != null) {
ac.setAuthorized(true);
ac.setAuthorizedID(ac.getAuthorizationID());
}
}
}
/**
* Given a secret manager and a username encoded for the encrypted handshake,
* determine the encryption key.
*
* @param userName containing the keyId, blockPoolId, and nonce.
* @return secret encryption key.
* @throws IOException
*/
private byte[] getEncryptionKeyFromUserName(String userName)
throws IOException {
String[] nameComponents = userName.split(NAME_DELIMITER);
if (nameComponents.length != 3) {
throw new IOException("Provided name '" + userName + "' has " +
nameComponents.length + " components instead of the expected 3.");
}
int keyId = Integer.parseInt(nameComponents[0]);
String blockPoolId = nameComponents[1];
byte[] nonce = Base64.decodeBase64(nameComponents[2]);
return blockPoolTokenSecretManager.retrieveDataEncryptionKey(keyId,
blockPoolId, nonce);
}
/**
* Receives SASL negotiation for general-purpose handshake.
*
* @param peer connection peer
* @param underlyingOut connection output stream
* @param underlyingIn connection input stream
* @param datanodeId ID of DataNode accepting connection
* @return new pair of streams, wrapped after SASL negotiation
* @throws IOException for any error
*/
private IOStreamPair getSaslStreams(Peer peer, OutputStream underlyingOut,
InputStream underlyingIn, final DatanodeID datanodeId) throws IOException {
SaslPropertiesResolver saslPropsResolver = dnConf.getSaslPropsResolver();
if (saslPropsResolver == null) {
throw new IOException(String.format("Cannot create a secured " +
"connection if DataNode listens on unprivileged port (%d) and no " +
"protection is defined in configuration property %s.",
datanodeId.getXferPort(), DFS_DATA_TRANSFER_PROTECTION_KEY));
}
Map<String, String> saslProps = saslPropsResolver.getServerProperties(
getPeerAddress(peer));
CallbackHandler callbackHandler = new SaslServerCallbackHandler(
new PasswordFunction() {
@Override
public char[] apply(String userName) throws IOException {
return buildServerPassword(userName);
}
});
return doSaslHandshake(underlyingOut, underlyingIn, saslProps,
callbackHandler);
}
/**
* Calculates the expected correct password on the server side for the
* general-purpose handshake. The password consists of the block access
* token's password (known to the DataNode via its secret manager). This
* expects that the client has supplied a user name consisting of its
* serialized block access token identifier.
*
* @param userName SASL user name containing serialized block access token
* identifier
* @return expected correct SASL password
* @throws IOException for any error
*/
private char[] buildServerPassword(String userName) throws IOException {
BlockTokenIdentifier identifier = deserializeIdentifier(userName);
byte[] tokenPassword = blockPoolTokenSecretManager.retrievePassword(
identifier);
return (new String(Base64.encodeBase64(tokenPassword, false),
Charsets.UTF_8)).toCharArray();
}
/**
* Deserializes a base64-encoded binary representation of a block access
* token.
*
* @param str String to deserialize
* @return BlockTokenIdentifier deserialized from str
* @throws IOException if there is any I/O error
*/
private BlockTokenIdentifier deserializeIdentifier(String str)
throws IOException {
BlockTokenIdentifier identifier = new BlockTokenIdentifier();
identifier.readFields(new DataInputStream(new ByteArrayInputStream(
Base64.decodeBase64(str))));
return identifier;
}
/**
* This method actually executes the server-side SASL handshake.
*
* @param underlyingOut connection output stream
* @param underlyingIn connection input stream
* @param saslProps properties of SASL negotiation
* @param callbackHandler for responding to SASL callbacks
* @return new pair of streams, wrapped after SASL negotiation
* @throws IOException for any error
*/
private IOStreamPair doSaslHandshake(OutputStream underlyingOut,
InputStream underlyingIn, Map<String, String> saslProps,
CallbackHandler callbackHandler) throws IOException {
DataInputStream in = new DataInputStream(underlyingIn);
DataOutputStream out = new DataOutputStream(underlyingOut);
SaslParticipant sasl = SaslParticipant.createServerSaslParticipant(saslProps,
callbackHandler);
int magicNumber = in.readInt();
if (magicNumber != SASL_TRANSFER_MAGIC_NUMBER) {
throw new InvalidMagicNumberException(magicNumber);
}
try {
// step 1
performSaslStep1(out, in, sasl);
// step 2 (server-side only)
byte[] remoteResponse = readSaslMessage(in);
byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
sendSaslMessage(out, localResponse);
// SASL handshake is complete
checkSaslComplete(sasl, saslProps);
return sasl.createStreamPair(out, in);
} catch (IOException ioe) {
if (ioe instanceof SaslException &&
ioe.getCause() != null &&
ioe.getCause() instanceof InvalidEncryptionKeyException) {
// This could just be because the client is long-lived and hasn't gotten
// a new encryption key from the NN in a while. Upon receiving this
// error, the client will get a new encryption key from the NN and retry
// connecting to this DN.
sendInvalidKeySaslErrorMessage(out, ioe.getCause().getMessage());
} else {
sendGenericSaslErrorMessage(out, ioe.getMessage());
}
throw ioe;
}
}
/**
* Sends a SASL negotiation message indicating an invalid key error.
*
* @param out stream to receive message
* @param message to send
* @throws IOException for any error
*/
private static void sendInvalidKeySaslErrorMessage(DataOutputStream out,
String message) throws IOException {
sendSaslMessage(out, DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY, null,
message);
}
}

View File

@ -0,0 +1,166 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.util.Map;
import javax.security.auth.callback.CallbackHandler;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.security.SaslInputStream;
import org.apache.hadoop.security.SaslOutputStream;
/**
* Strongly inspired by Thrift's TSaslTransport class.
*
* Used to abstract over the <code>SaslServer</code> and
* <code>SaslClient</code> classes, which share a lot of their interface, but
* unfortunately don't share a common superclass.
*/
@InterfaceAudience.Private
class SaslParticipant {
// This has to be set as part of the SASL spec, but it don't matter for
// our purposes, but may not be empty. It's sent over the wire, so use
// a short string.
private static final String SERVER_NAME = "0";
private static final String PROTOCOL = "hdfs";
private static final String MECHANISM = "DIGEST-MD5";
// One of these will always be null.
private final SaslServer saslServer;
private final SaslClient saslClient;
/**
* Creates a SaslParticipant wrapping a SaslServer.
*
* @param saslProps properties of SASL negotiation
* @param callbackHandler for handling all SASL callbacks
* @return SaslParticipant wrapping SaslServer
* @throws SaslException for any error
*/
public static SaslParticipant createServerSaslParticipant(
Map<String, String> saslProps, CallbackHandler callbackHandler)
throws SaslException {
return new SaslParticipant(Sasl.createSaslServer(MECHANISM,
PROTOCOL, SERVER_NAME, saslProps, callbackHandler));
}
/**
* Creates a SaslParticipant wrapping a SaslClient.
*
* @param userName SASL user name
* @param saslProps properties of SASL negotiation
* @param callbackHandler for handling all SASL callbacks
* @return SaslParticipant wrapping SaslClient
* @throws SaslException for any error
*/
public static SaslParticipant createClientSaslParticipant(String userName,
Map<String, String> saslProps, CallbackHandler callbackHandler)
throws SaslException {
return new SaslParticipant(Sasl.createSaslClient(new String[] { MECHANISM },
userName, PROTOCOL, SERVER_NAME, saslProps, callbackHandler));
}
/**
* Private constructor wrapping a SaslServer.
*
* @param saslServer to wrap
*/
private SaslParticipant(SaslServer saslServer) {
this.saslServer = saslServer;
this.saslClient = null;
}
/**
* Private constructor wrapping a SaslClient.
*
* @param saslClient to wrap
*/
private SaslParticipant(SaslClient saslClient) {
this.saslServer = null;
this.saslClient = saslClient;
}
/**
* @see {@link SaslServer#evaluateResponse}
* @see {@link SaslClient#evaluateChallenge}
*/
public byte[] evaluateChallengeOrResponse(byte[] challengeOrResponse)
throws SaslException {
if (saslClient != null) {
return saslClient.evaluateChallenge(challengeOrResponse);
} else {
return saslServer.evaluateResponse(challengeOrResponse);
}
}
/**
* After successful SASL negotation, returns the negotiated quality of
* protection.
*
* @return negotiated quality of protection
*/
public String getNegotiatedQop() {
if (saslClient != null) {
return (String) saslClient.getNegotiatedProperty(Sasl.QOP);
} else {
return (String) saslServer.getNegotiatedProperty(Sasl.QOP);
}
}
/**
* Returns true if SASL negotiation is complete.
*
* @return true if SASL negotiation is complete
*/
public boolean isComplete() {
if (saslClient != null) {
return saslClient.isComplete();
} else {
return saslServer.isComplete();
}
}
/**
* Return some input/output streams that may henceforth have their
* communication encrypted, depending on the negotiated quality of protection.
*
* @param out output stream to wrap
* @param in input stream to wrap
* @return IOStreamPair wrapping the streams
*/
public IOStreamPair createStreamPair(DataOutputStream out,
DataInputStream in) {
if (saslClient != null) {
return new IOStreamPair(
new SaslInputStream(in, saslClient),
new SaslOutputStream(out, saslClient));
} else {
return new IOStreamPair(
new SaslInputStream(in, saslServer),
new SaslOutputStream(out, saslServer));
}
}
}

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.hdfs.server.balancer; package org.apache.hadoop.hdfs.server.balancer;
import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY;
import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed; import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
import java.io.BufferedInputStream; import java.io.BufferedInputStream;
@ -62,9 +64,11 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
@ -202,6 +206,7 @@ public class Balancer {
private final NameNodeConnector nnc; private final NameNodeConnector nnc;
private final BalancingPolicy policy; private final BalancingPolicy policy;
private final SaslDataTransferClient saslClient;
private final double threshold; private final double threshold;
// all data node lists // all data node lists
@ -352,19 +357,18 @@ public class Balancer {
OutputStream unbufOut = sock.getOutputStream(); OutputStream unbufOut = sock.getOutputStream();
InputStream unbufIn = sock.getInputStream(); InputStream unbufIn = sock.getInputStream();
if (nnc.getDataEncryptionKey() != null) { ExtendedBlock eb = new ExtendedBlock(nnc.blockpoolID, block.getBlock());
IOStreamPair encryptedStreams = Token<BlockTokenIdentifier> accessToken = nnc.getAccessToken(eb);
DataTransferEncryptor.getEncryptedStreams( IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
unbufOut, unbufIn, nnc.getDataEncryptionKey()); unbufIn, nnc, accessToken, target.datanode);
unbufOut = encryptedStreams.out; unbufOut = saslStreams.out;
unbufIn = encryptedStreams.in; unbufIn = saslStreams.in;
}
out = new DataOutputStream(new BufferedOutputStream(unbufOut, out = new DataOutputStream(new BufferedOutputStream(unbufOut,
HdfsConstants.IO_FILE_BUFFER_SIZE)); HdfsConstants.IO_FILE_BUFFER_SIZE));
in = new DataInputStream(new BufferedInputStream(unbufIn, in = new DataInputStream(new BufferedInputStream(unbufIn,
HdfsConstants.IO_FILE_BUFFER_SIZE)); HdfsConstants.IO_FILE_BUFFER_SIZE));
sendRequest(out); sendRequest(out, eb, accessToken);
receiveResponse(in); receiveResponse(in);
bytesMoved.addAndGet(block.getNumBytes()); bytesMoved.addAndGet(block.getNumBytes());
LOG.info("Successfully moved " + this); LOG.info("Successfully moved " + this);
@ -395,9 +399,8 @@ public class Balancer {
} }
/* Send a block replace request to the output stream*/ /* Send a block replace request to the output stream*/
private void sendRequest(DataOutputStream out) throws IOException { private void sendRequest(DataOutputStream out, ExtendedBlock eb,
final ExtendedBlock eb = new ExtendedBlock(nnc.blockpoolID, block.getBlock()); Token<BlockTokenIdentifier> accessToken) throws IOException {
final Token<BlockTokenIdentifier> accessToken = nnc.getAccessToken(eb);
new Sender(out).replaceBlock(eb, accessToken, new Sender(out).replaceBlock(eb, accessToken,
source.getStorageID(), proxySource.getDatanode()); source.getStorageID(), proxySource.getDatanode());
} }
@ -876,6 +879,12 @@ public class Balancer {
this.maxConcurrentMovesPerNode = this.maxConcurrentMovesPerNode =
conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT); DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
this.saslClient = new SaslDataTransferClient(
DataTransferSaslUtil.getSaslPropertiesResolver(conf),
TrustedChannelResolver.getInstance(conf),
conf.getBoolean(
IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT));
} }
/* Given a data node set, build a network topology and decide /* Given a data node set, build a network topology and decide

View File

@ -34,8 +34,8 @@ import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
@ -50,7 +50,7 @@ import org.apache.hadoop.util.Daemon;
* The class provides utilities for {@link Balancer} to access a NameNode * The class provides utilities for {@link Balancer} to access a NameNode
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
class NameNodeConnector { class NameNodeConnector implements DataEncryptionKeyFactory {
private static final Log LOG = Balancer.LOG; private static final Log LOG = Balancer.LOG;
private static final Path BALANCER_ID_PATH = new Path("/system/balancer.id"); private static final Path BALANCER_ID_PATH = new Path("/system/balancer.id");
private static final int MAX_NOT_CHANGED_ITERATIONS = 5; private static final int MAX_NOT_CHANGED_ITERATIONS = 5;
@ -72,7 +72,6 @@ class NameNodeConnector {
private BlockTokenSecretManager blockTokenSecretManager; private BlockTokenSecretManager blockTokenSecretManager;
private Daemon keyupdaterthread; // AccessKeyUpdater thread private Daemon keyupdaterthread; // AccessKeyUpdater thread
private DataEncryptionKey encryptionKey; private DataEncryptionKey encryptionKey;
private final TrustedChannelResolver trustedChannelResolver;
NameNodeConnector(URI nameNodeUri, NameNodeConnector(URI nameNodeUri,
Configuration conf) throws IOException { Configuration conf) throws IOException {
@ -122,7 +121,6 @@ class NameNodeConnector {
if (out == null) { if (out == null) {
throw new IOException("Another balancer is running"); throw new IOException("Another balancer is running");
} }
this.trustedChannelResolver = TrustedChannelResolver.getInstance(conf);
} }
boolean shouldContinue(long dispatchBlockMoveBytes) { boolean shouldContinue(long dispatchBlockMoveBytes) {
@ -155,9 +153,9 @@ class NameNodeConnector {
} }
} }
DataEncryptionKey getDataEncryptionKey() @Override
throws IOException { public DataEncryptionKey newDataEncryptionKey() {
if (encryptDataTransfer && !this.trustedChannelResolver.isTrusted()) { if (encryptDataTransfer) {
synchronized (this) { synchronized (this) {
if (encryptionKey == null) { if (encryptionKey == null) {
encryptionKey = blockTokenSecretManager.generateDataEncryptionKey(); encryptionKey = blockTokenSecretManager.generateDataEncryptionKey();

View File

@ -58,8 +58,8 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
@ -211,14 +211,16 @@ public class JspHelper {
} }
public static void streamBlockInAscii(InetSocketAddress addr, String poolId, public static void streamBlockInAscii(InetSocketAddress addr, String poolId,
long blockId, Token<BlockTokenIdentifier> blockToken, long genStamp, long blockId, final Token<BlockTokenIdentifier> blockToken, long genStamp,
long blockSize, long offsetIntoBlock, long chunkSizeToView, long blockSize, long offsetIntoBlock, long chunkSizeToView,
JspWriter out, final Configuration conf, DFSClient.Conf dfsConf, JspWriter out, final Configuration conf, DFSClient.Conf dfsConf,
final DataEncryptionKey encryptionKey) final DFSClient dfs, final SaslDataTransferClient saslClient)
throws IOException { throws IOException {
if (chunkSizeToView == 0) return; if (chunkSizeToView == 0) return;
int amtToRead = (int)Math.min(chunkSizeToView, blockSize - offsetIntoBlock); int amtToRead = (int)Math.min(chunkSizeToView, blockSize - offsetIntoBlock);
DatanodeID datanodeId = new DatanodeID(addr.getAddress().getHostAddress(),
addr.getHostName(), poolId, addr.getPort(), 0, 0, 0);
BlockReader blockReader = new BlockReaderFactory(dfsConf). BlockReader blockReader = new BlockReaderFactory(dfsConf).
setInetSocketAddress(addr). setInetSocketAddress(addr).
setBlock(new ExtendedBlock(poolId, blockId, 0, genStamp)). setBlock(new ExtendedBlock(poolId, blockId, 0, genStamp)).
@ -229,21 +231,21 @@ public class JspHelper {
setVerifyChecksum(true). setVerifyChecksum(true).
setClientName("JspHelper"). setClientName("JspHelper").
setClientCacheContext(ClientContext.getFromConf(conf)). setClientCacheContext(ClientContext.getFromConf(conf)).
setDatanodeInfo(new DatanodeInfo( setDatanodeInfo(new DatanodeInfo(datanodeId)).
new DatanodeID(addr.getAddress().getHostAddress(),
addr.getHostName(), poolId, addr.getPort(), 0, 0, 0))).
setCachingStrategy(CachingStrategy.newDefaultStrategy()). setCachingStrategy(CachingStrategy.newDefaultStrategy()).
setConfiguration(conf). setConfiguration(conf).
setRemotePeerFactory(new RemotePeerFactory() { setRemotePeerFactory(new RemotePeerFactory() {
@Override @Override
public Peer newConnectedPeer(InetSocketAddress addr) public Peer newConnectedPeer(InetSocketAddress addr,
Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
throws IOException { throws IOException {
Peer peer = null; Peer peer = null;
Socket sock = NetUtils.getDefaultSocketFactory(conf).createSocket(); Socket sock = NetUtils.getDefaultSocketFactory(conf).createSocket();
try { try {
sock.connect(addr, HdfsServerConstants.READ_TIMEOUT); sock.connect(addr, HdfsServerConstants.READ_TIMEOUT);
sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT); sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
peer = TcpPeerServer.peerFromSocketAndKey(sock, encryptionKey); peer = TcpPeerServer.peerFromSocketAndKey(saslClient, sock, dfs,
blockToken, datanodeId);
} finally { } finally {
if (peer == null) { if (peer == null) {
IOUtils.closeSocket(sock); IOUtils.closeSocket(sock);

View File

@ -52,7 +52,9 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_RESTART_REPLICA_
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.security.SaslPropertiesResolver;
/** /**
* Simple class encapsulating all of the configuration that the DataNode * Simple class encapsulating all of the configuration that the DataNode
@ -86,6 +88,7 @@ public class DNConf {
final String minimumNameNodeVersion; final String minimumNameNodeVersion;
final String encryptionAlgorithm; final String encryptionAlgorithm;
final SaslPropertiesResolver saslPropsResolver;
final TrustedChannelResolver trustedChannelResolver; final TrustedChannelResolver trustedChannelResolver;
final long xceiverStopTimeout; final long xceiverStopTimeout;
@ -168,6 +171,8 @@ public class DNConf {
DFS_ENCRYPT_DATA_TRANSFER_DEFAULT); DFS_ENCRYPT_DATA_TRANSFER_DEFAULT);
this.encryptionAlgorithm = conf.get(DFS_DATA_ENCRYPTION_ALGORITHM_KEY); this.encryptionAlgorithm = conf.get(DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
this.trustedChannelResolver = TrustedChannelResolver.getInstance(conf); this.trustedChannelResolver = TrustedChannelResolver.getInstance(conf);
this.saslPropsResolver = DataTransferSaslUtil.getSaslPropertiesResolver(
conf);
this.xceiverStopTimeout = conf.getLong( this.xceiverStopTimeout = conf.getLong(
DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY, DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY,
@ -187,6 +192,25 @@ public class DNConf {
return this.minimumNameNodeVersion; return this.minimumNameNodeVersion;
} }
/**
* Returns true if encryption enabled for DataTransferProtocol.
*
* @return boolean true if encryption enabled for DataTransferProtocol
*/
public boolean getEncryptDataTransfer() {
return encryptDataTransfer;
}
/**
* Returns encryption algorithm configured for DataTransferProtocol, or null
* if not configured.
*
* @return encryption algorithm configured for DataTransferProtocol
*/
public String getEncryptionAlgorithm() {
return encryptionAlgorithm;
}
public long getXceiverStopTimeout() { public long getXceiverStopTimeout() {
return xceiverStopTimeout; return xceiverStopTimeout;
} }
@ -194,4 +218,24 @@ public class DNConf {
public long getMaxLockedMemory() { public long getMaxLockedMemory() {
return maxLockedMemory; return maxLockedMemory;
} }
/**
* Returns the SaslPropertiesResolver configured for use with
* DataTransferProtocol, or null if not configured.
*
* @return SaslPropertiesResolver configured for use with DataTransferProtocol
*/
public SaslPropertiesResolver getSaslPropsResolver() {
return saslPropsResolver;
}
/**
* Returns the TrustedChannelResolver configured for use with
* DataTransferProtocol, or null if not configured.
*
* @return TrustedChannelResolver configured for use with DataTransferProtocol
*/
public TrustedChannelResolver getTrustedChannelResolver() {
return trustedChannelResolver;
}
} }

View File

@ -17,6 +17,9 @@
*/ */
package org.apache.hadoop.hdfs.server.datanode; package org.apache.hadoop.hdfs.server.datanode;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
@ -41,6 +44,9 @@ import org.apache.hadoop.hdfs.net.DomainPeerServer;
import org.apache.hadoop.hdfs.net.TcpPeerServer; import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.*; import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.protocol.datatransfer.*; import org.apache.hadoop.hdfs.protocol.datatransfer.*;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferServer;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ClientDatanodeProtocolService; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ClientDatanodeProtocolService;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DNTransferAckProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DNTransferAckProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
@ -227,6 +233,8 @@ public class DataNode extends Configured
private final List<String> usersWithLocalPathAccess; private final List<String> usersWithLocalPathAccess;
private final boolean connectToDnViaHostname; private final boolean connectToDnViaHostname;
ReadaheadPool readaheadPool; ReadaheadPool readaheadPool;
SaslDataTransferClient saslClient;
SaslDataTransferServer saslServer;
private final boolean getHdfsBlockLocationsEnabled; private final boolean getHdfsBlockLocationsEnabled;
private ObjectName dataNodeInfoBeanName; private ObjectName dataNodeInfoBeanName;
private Thread checkDiskErrorThread = null; private Thread checkDiskErrorThread = null;
@ -729,15 +737,10 @@ public class DataNode extends Configured
*/ */
void startDataNode(Configuration conf, void startDataNode(Configuration conf,
List<StorageLocation> dataDirs, List<StorageLocation> dataDirs,
// DatanodeProtocol namenode,
SecureResources resources SecureResources resources
) throws IOException { ) throws IOException {
if(UserGroupInformation.isSecurityEnabled() && resources == null) {
if (!conf.getBoolean("ignore.secure.ports.for.testing", false)) { checkSecureConfig(conf, resources);
throw new RuntimeException("Cannot start secure cluster without "
+ "privileged resources.");
}
}
// settings global for all BPs in the Data Node // settings global for all BPs in the Data Node
this.secureResources = resources; this.secureResources = resources;
@ -797,6 +800,55 @@ public class DataNode extends Configured
// Create the ReadaheadPool from the DataNode context so we can // Create the ReadaheadPool from the DataNode context so we can
// exit without having to explicitly shutdown its thread pool. // exit without having to explicitly shutdown its thread pool.
readaheadPool = ReadaheadPool.getInstance(); readaheadPool = ReadaheadPool.getInstance();
saslClient = new SaslDataTransferClient(dnConf.saslPropsResolver,
dnConf.trustedChannelResolver,
conf.getBoolean(
IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT));
saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager);
}
/**
* Checks if the DataNode has a secure configuration if security is enabled.
* There are 2 possible configurations that are considered secure:
* 1. The server has bound to privileged ports for RPC and HTTP via
* SecureDataNodeStarter.
* 2. The configuration enables SASL on DataTransferProtocol and HTTPS (no
* plain HTTP) for the HTTP server. The SASL handshake guarantees
* authentication of the RPC server before a client transmits a secret, such
* as a block access token. Similarly, SSL guarantees authentication of the
* HTTP server before a client transmits a secret, such as a delegation
* token.
* It is not possible to run with both privileged ports and SASL on
* DataTransferProtocol. For backwards-compatibility, the connection logic
* must check if the target port is a privileged port, and if so, skip the
* SASL handshake.
*
* @param conf Configuration to check
* @param resources SecuredResources obtained for DataNode
* @throws RuntimeException if security enabled, but configuration is insecure
*/
private static void checkSecureConfig(Configuration conf,
SecureResources resources) throws RuntimeException {
if (!UserGroupInformation.isSecurityEnabled()) {
return;
}
String dataTransferProtection = conf.get(DFS_DATA_TRANSFER_PROTECTION_KEY);
if (resources != null && dataTransferProtection == null) {
return;
}
if (conf.getBoolean("ignore.secure.ports.for.testing", false)) {
return;
}
if (dataTransferProtection != null &&
DFSUtil.getHttpPolicy(conf) == HttpConfig.Policy.HTTPS_ONLY &&
resources == null) {
return;
}
throw new RuntimeException("Cannot start secure DataNode without " +
"configuring either privileged resources or SASL RPC data transfer " +
"protection and SSL for HTTP. Using privileged resources in " +
"combination with SASL RPC data transfer protection is not supported.");
} }
public static String generateUuid() { public static String generateUuid() {
@ -1630,28 +1682,6 @@ public class DataNode extends Configured
NetUtils.connect(sock, curTarget, dnConf.socketTimeout); NetUtils.connect(sock, curTarget, dnConf.socketTimeout);
sock.setSoTimeout(targets.length * dnConf.socketTimeout); sock.setSoTimeout(targets.length * dnConf.socketTimeout);
long writeTimeout = dnConf.socketWriteTimeout +
HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1);
OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
InputStream unbufIn = NetUtils.getInputStream(sock);
if (dnConf.encryptDataTransfer &&
!dnConf.trustedChannelResolver.isTrusted(sock.getInetAddress())) {
IOStreamPair encryptedStreams =
DataTransferEncryptor.getEncryptedStreams(
unbufOut, unbufIn,
blockPoolTokenSecretManager.generateDataEncryptionKey(
b.getBlockPoolId()));
unbufOut = encryptedStreams.out;
unbufIn = encryptedStreams.in;
}
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
HdfsConstants.SMALL_BUFFER_SIZE));
in = new DataInputStream(unbufIn);
blockSender = new BlockSender(b, 0, b.getNumBytes(),
false, false, true, DataNode.this, null, cachingStrategy);
DatanodeInfo srcNode = new DatanodeInfo(bpReg);
// //
// Header info // Header info
// //
@ -1661,6 +1691,24 @@ public class DataNode extends Configured
EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)); EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE));
} }
long writeTimeout = dnConf.socketWriteTimeout +
HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1);
OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
InputStream unbufIn = NetUtils.getInputStream(sock);
DataEncryptionKeyFactory keyFactory =
getDataEncryptionKeyFactoryForBlock(b);
IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
unbufIn, keyFactory, accessToken, bpReg);
unbufOut = saslStreams.out;
unbufIn = saslStreams.in;
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
HdfsConstants.SMALL_BUFFER_SIZE));
in = new DataInputStream(unbufIn);
blockSender = new BlockSender(b, 0, b.getNumBytes(),
false, false, true, DataNode.this, null, cachingStrategy);
DatanodeInfo srcNode = new DatanodeInfo(bpReg);
new Sender(out).writeBlock(b, accessToken, clientname, targets, srcNode, new Sender(out).writeBlock(b, accessToken, clientname, targets, srcNode,
stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy); stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy);
@ -1704,6 +1752,25 @@ public class DataNode extends Configured
} }
} }
/**
* Returns a new DataEncryptionKeyFactory that generates a key from the
* BlockPoolTokenSecretManager, using the block pool ID of the given block.
*
* @param block for which the factory needs to create a key
* @return DataEncryptionKeyFactory for block's block pool ID
*/
DataEncryptionKeyFactory getDataEncryptionKeyFactoryForBlock(
final ExtendedBlock block) {
return new DataEncryptionKeyFactory() {
@Override
public DataEncryptionKey newDataEncryptionKey() {
return dnConf.encryptDataTransfer ?
blockPoolTokenSecretManager.generateDataEncryptionKey(
block.getBlockPoolId()) : null;
}
};
}
/** /**
* After a block becomes finalized, a datanode increases metric counter, * After a block becomes finalized, a datanode increases metric counter,
* notifies namenode, and adds it to the block scanner * notifies namenode, and adds it to the block scanner

View File

@ -36,11 +36,9 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.Socket; import java.net.Socket;
import java.net.SocketException; import java.net.SocketException;
import java.net.UnknownHostException;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.security.MessageDigest; import java.security.MessageDigest;
import java.util.Arrays; import java.util.Arrays;
@ -52,13 +50,12 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor.InvalidMagicNumberException;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.Op; import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
import org.apache.hadoop.hdfs.protocol.datatransfer.Receiver; import org.apache.hadoop.hdfs.protocol.datatransfer.Receiver;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
@ -85,7 +82,6 @@ import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.net.InetAddresses;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
@ -174,24 +170,11 @@ class DataXceiver extends Receiver implements Runnable {
dataXceiverServer.addPeer(peer, Thread.currentThread()); dataXceiverServer.addPeer(peer, Thread.currentThread());
peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout); peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout);
InputStream input = socketIn; InputStream input = socketIn;
if ((!peer.hasSecureChannel()) && dnConf.encryptDataTransfer && IOStreamPair saslStreams = datanode.saslServer.receive(peer, socketOut,
!dnConf.trustedChannelResolver.isTrusted(getClientAddress(peer))){ socketIn, datanode.getDatanodeId());
IOStreamPair encryptedStreams = null; input = new BufferedInputStream(saslStreams.in,
try { HdfsConstants.SMALL_BUFFER_SIZE);
encryptedStreams = DataTransferEncryptor.getEncryptedStreams(socketOut, socketOut = saslStreams.out;
socketIn, datanode.blockPoolTokenSecretManager,
dnConf.encryptionAlgorithm);
} catch (InvalidMagicNumberException imne) {
LOG.info("Failed to read expected encryption handshake from client " +
"at " + peer.getRemoteAddressString() + ". Perhaps the client " +
"is running an older version of Hadoop which does not support " +
"encryption");
return;
}
input = encryptedStreams.in;
socketOut = encryptedStreams.out;
}
input = new BufferedInputStream(input, HdfsConstants.SMALL_BUFFER_SIZE);
super.initialize(new DataInputStream(input)); super.initialize(new DataInputStream(input));
@ -264,19 +247,6 @@ class DataXceiver extends Receiver implements Runnable {
} }
} }
/**
* Returns InetAddress from peer
* The getRemoteAddressString is the form /ip-address:port
* The ip-address is extracted from peer and InetAddress is formed
* @param peer
* @return
* @throws UnknownHostException
*/
private static InetAddress getClientAddress(Peer peer) {
return InetAddresses.forString(
peer.getRemoteAddressString().split(":")[0].substring(1));
}
@Override @Override
public void requestShortCircuitFds(final ExtendedBlock blk, public void requestShortCircuitFds(final ExtendedBlock blk,
final Token<BlockTokenIdentifier> token, final Token<BlockTokenIdentifier> token,
@ -656,17 +626,12 @@ class DataXceiver extends Receiver implements Runnable {
OutputStream unbufMirrorOut = NetUtils.getOutputStream(mirrorSock, OutputStream unbufMirrorOut = NetUtils.getOutputStream(mirrorSock,
writeTimeout); writeTimeout);
InputStream unbufMirrorIn = NetUtils.getInputStream(mirrorSock); InputStream unbufMirrorIn = NetUtils.getInputStream(mirrorSock);
if (dnConf.encryptDataTransfer && DataEncryptionKeyFactory keyFactory =
!dnConf.trustedChannelResolver.isTrusted(mirrorSock.getInetAddress())) { datanode.getDataEncryptionKeyFactoryForBlock(block);
IOStreamPair encryptedStreams = IOStreamPair saslStreams = datanode.saslClient.socketSend(mirrorSock,
DataTransferEncryptor.getEncryptedStreams( unbufMirrorOut, unbufMirrorIn, keyFactory, blockToken, targets[0]);
unbufMirrorOut, unbufMirrorIn, unbufMirrorOut = saslStreams.out;
datanode.blockPoolTokenSecretManager unbufMirrorIn = saslStreams.in;
.generateDataEncryptionKey(block.getBlockPoolId()));
unbufMirrorOut = encryptedStreams.out;
unbufMirrorIn = encryptedStreams.in;
}
mirrorOut = new DataOutputStream(new BufferedOutputStream(unbufMirrorOut, mirrorOut = new DataOutputStream(new BufferedOutputStream(unbufMirrorOut,
HdfsConstants.SMALL_BUFFER_SIZE)); HdfsConstants.SMALL_BUFFER_SIZE));
mirrorIn = new DataInputStream(unbufMirrorIn); mirrorIn = new DataInputStream(unbufMirrorIn);
@ -1026,17 +991,12 @@ class DataXceiver extends Receiver implements Runnable {
OutputStream unbufProxyOut = NetUtils.getOutputStream(proxySock, OutputStream unbufProxyOut = NetUtils.getOutputStream(proxySock,
dnConf.socketWriteTimeout); dnConf.socketWriteTimeout);
InputStream unbufProxyIn = NetUtils.getInputStream(proxySock); InputStream unbufProxyIn = NetUtils.getInputStream(proxySock);
if (dnConf.encryptDataTransfer && DataEncryptionKeyFactory keyFactory =
!dnConf.trustedChannelResolver.isTrusted( datanode.getDataEncryptionKeyFactoryForBlock(block);
proxySock.getInetAddress())) { IOStreamPair saslStreams = datanode.saslClient.socketSend(proxySock,
IOStreamPair encryptedStreams = unbufProxyOut, unbufProxyIn, keyFactory, blockToken, proxySource);
DataTransferEncryptor.getEncryptedStreams( unbufProxyOut = saslStreams.out;
unbufProxyOut, unbufProxyIn, unbufProxyIn = saslStreams.in;
datanode.blockPoolTokenSecretManager
.generateDataEncryptionKey(block.getBlockPoolId()));
unbufProxyOut = encryptedStreams.out;
unbufProxyIn = encryptedStreams.in;
}
proxyOut = new DataOutputStream(new BufferedOutputStream(unbufProxyOut, proxyOut = new DataOutputStream(new BufferedOutputStream(unbufProxyOut,
HdfsConstants.SMALL_BUFFER_SIZE)); HdfsConstants.SMALL_BUFFER_SIZE));

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.server.common.JspHelper; import org.apache.hadoop.hdfs.server.common.JspHelper;
@ -510,7 +511,7 @@ public class DatanodeJspHelper {
JspHelper.streamBlockInAscii(new InetSocketAddress(req.getServerName(), JspHelper.streamBlockInAscii(new InetSocketAddress(req.getServerName(),
datanodePort), bpid, blockId, blockToken, genStamp, blockSize, datanodePort), bpid, blockId, blockToken, genStamp, blockSize,
startOffset, chunkSizeToView, out, conf, dfs.getConf(), startOffset, chunkSizeToView, out, conf, dfs.getConf(),
dfs.getDataEncryptionKey()); dfs, getSaslDataTransferClient(req));
} catch (Exception e) { } catch (Exception e) {
out.print(e); out.print(e);
} }
@ -669,7 +670,7 @@ public class DatanodeJspHelper {
out.print("<textarea cols=\"100\" rows=\"25\" wrap=\"virtual\" style=\"width:100%\" READONLY>"); out.print("<textarea cols=\"100\" rows=\"25\" wrap=\"virtual\" style=\"width:100%\" READONLY>");
JspHelper.streamBlockInAscii(addr, poolId, blockId, accessToken, genStamp, JspHelper.streamBlockInAscii(addr, poolId, blockId, accessToken, genStamp,
blockSize, startOffset, chunkSizeToView, out, conf, dfs.getConf(), blockSize, startOffset, chunkSizeToView, out, conf, dfs.getConf(),
dfs.getDataEncryptionKey()); dfs, getSaslDataTransferClient(req));
out.print("</textarea>"); out.print("</textarea>");
dfs.close(); dfs.close();
} }
@ -702,4 +703,16 @@ public class DatanodeJspHelper {
return sb.toString(); return sb.toString();
} }
/**
* Gets the {@link SaslDataTransferClient} from the {@link DataNode} attached
* to the servlet context.
*
* @return SaslDataTransferClient from DataNode
*/
private static SaslDataTransferClient getSaslDataTransferClient(
HttpServletRequest req) {
DataNode dataNode = (DataNode)req.getSession().getServletContext()
.getAttribute("datanode");
return dataNode.saslClient;
}
} }

View File

@ -17,6 +17,9 @@
*/ */
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
@ -47,6 +50,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.RemotePeerFactory; import org.apache.hadoop.hdfs.RemotePeerFactory;
import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.net.TcpPeerServer; import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@ -55,6 +59,12 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus;
import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas; import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas;
@ -65,6 +75,7 @@ import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -92,7 +103,7 @@ import com.google.common.annotations.VisibleForTesting;
* factors of each file. * factors of each file.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class NamenodeFsck { public class NamenodeFsck implements DataEncryptionKeyFactory {
public static final Log LOG = LogFactory.getLog(NameNode.class.getName()); public static final Log LOG = LogFactory.getLog(NameNode.class.getName());
// return string marking fsck status // return string marking fsck status
@ -149,6 +160,7 @@ public class NamenodeFsck {
private List<String> snapshottableDirs = null; private List<String> snapshottableDirs = null;
private final BlockPlacementPolicy bpPolicy; private final BlockPlacementPolicy bpPolicy;
private final SaslDataTransferClient saslClient;
/** /**
* Filesystem checker. * Filesystem checker.
@ -175,6 +187,12 @@ public class NamenodeFsck {
networktopology, networktopology,
namenode.getNamesystem().getBlockManager().getDatanodeManager() namenode.getNamesystem().getBlockManager().getDatanodeManager()
.getHost2DatanodeMap()); .getHost2DatanodeMap());
this.saslClient = new SaslDataTransferClient(
DataTransferSaslUtil.getSaslPropertiesResolver(conf),
TrustedChannelResolver.getInstance(conf),
conf.getBoolean(
IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT));
for (Iterator<String> it = pmap.keySet().iterator(); it.hasNext();) { for (Iterator<String> it = pmap.keySet().iterator(); it.hasNext();) {
String key = it.next(); String key = it.next();
@ -616,15 +634,16 @@ public class NamenodeFsck {
setConfiguration(namenode.conf). setConfiguration(namenode.conf).
setRemotePeerFactory(new RemotePeerFactory() { setRemotePeerFactory(new RemotePeerFactory() {
@Override @Override
public Peer newConnectedPeer(InetSocketAddress addr) public Peer newConnectedPeer(InetSocketAddress addr,
Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
throws IOException { throws IOException {
Peer peer = null; Peer peer = null;
Socket s = NetUtils.getDefaultSocketFactory(conf).createSocket(); Socket s = NetUtils.getDefaultSocketFactory(conf).createSocket();
try { try {
s.connect(addr, HdfsServerConstants.READ_TIMEOUT); s.connect(addr, HdfsServerConstants.READ_TIMEOUT);
s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT); s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
peer = TcpPeerServer.peerFromSocketAndKey(s, namenode.getRpcServer(). peer = TcpPeerServer.peerFromSocketAndKey(saslClient, s,
getDataEncryptionKey()); NamenodeFsck.this, blockToken, datanodeId);
} finally { } finally {
if (peer == null) { if (peer == null) {
IOUtils.closeQuietly(s); IOUtils.closeQuietly(s);
@ -664,6 +683,11 @@ public class NamenodeFsck {
} }
} }
@Override
public DataEncryptionKey newDataEncryptionKey() throws IOException {
return namenode.getRpcServer().getDataEncryptionKey();
}
/* /*
* XXX (ab) See comment above for copyBlock(). * XXX (ab) See comment above for copyBlock().
* *

View File

@ -1451,6 +1451,37 @@
</description> </description>
</property> </property>
<property>
<name>dfs.data.transfer.protection</name>
<value></value>
<description>
A comma-separated list of SASL protection values used for secured
connections to the DataNode when reading or writing block data. Possible
values are authentication, integrity and privacy. authentication means
authentication only and no integrity or privacy; integrity implies
authentication and integrity are enabled; and privacy implies all of
authentication, integrity and privacy are enabled. If
dfs.encrypt.data.transfer is set to true, then it supersedes the setting for
dfs.data.transfer.protection and enforces that all connections must use a
specialized encrypted SASL handshake. This property is ignored for
connections to a DataNode listening on a privileged port. In this case, it
is assumed that the use of a privileged port establishes sufficient trust.
</description>
</property>
<property>
<name>dfs.data.transfer.saslproperties.resolver.class</name>
<value></value>
<description>
SaslPropertiesResolver used to resolve the QOP used for a connection to the
DataNode when reading or writing block data. If not specified, the full set
of values specified in dfs.data.transfer.protection is used while
determining the QOP used for the connection. If a class is specified, then
the QOP values returned by the class will be used while determining the QOP
used for the connection.
</description>
</property>
<property> <property>
<name>dfs.datanode.hdfs-blocks-metadata.enabled</name> <name>dfs.datanode.hdfs-blocks-metadata.enabled</name>
<value>false</value> <value>false</value>

View File

@ -33,9 +33,11 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.net.TcpPeerServer; import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor; import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
@ -48,6 +50,7 @@ import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
@ -192,7 +195,8 @@ public class BlockReaderTestUtil {
setAllowShortCircuitLocalReads(true). setAllowShortCircuitLocalReads(true).
setRemotePeerFactory(new RemotePeerFactory() { setRemotePeerFactory(new RemotePeerFactory() {
@Override @Override
public Peer newConnectedPeer(InetSocketAddress addr) public Peer newConnectedPeer(InetSocketAddress addr,
Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
throws IOException { throws IOException {
Peer peer = null; Peer peer = null;
Socket sock = NetUtils. Socket sock = NetUtils.

View File

@ -19,12 +19,15 @@ package org.apache.hadoop.hdfs;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY;
@ -1310,15 +1313,42 @@ public class MiniDFSCluster {
} }
SecureResources secureResources = null; SecureResources secureResources = null;
if (UserGroupInformation.isSecurityEnabled()) { if (UserGroupInformation.isSecurityEnabled() &&
conf.get(DFS_DATA_TRANSFER_PROTECTION_KEY) == null) {
try { try {
secureResources = SecureDataNodeStarter.getSecureResources(dnConf); secureResources = SecureDataNodeStarter.getSecureResources(dnConf);
} catch (Exception ex) { } catch (Exception ex) {
ex.printStackTrace(); ex.printStackTrace();
} }
} }
DataNode dn = DataNode.instantiateDataNode(dnArgs, dnConf, final int maxRetriesOnSasl = conf.getInt(
IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY,
IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_DEFAULT);
int numRetries = 0;
DataNode dn = null;
while (true) {
try {
dn = DataNode.instantiateDataNode(dnArgs, dnConf,
secureResources); secureResources);
break;
} catch (IOException e) {
// Work around issue testing security where rapidly starting multiple
// DataNodes using the same principal gets rejected by the KDC as a
// replay attack.
if (UserGroupInformation.isSecurityEnabled() &&
numRetries < maxRetriesOnSasl) {
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
++numRetries;
continue;
}
throw e;
}
}
if(dn == null) if(dn == null)
throw new IOException("Cannot start DataNode in " throw new IOException("Cannot start DataNode in "
+ dnConf.get(DFS_DATANODE_DATA_DIR_KEY)); + dnConf.get(DFS_DATANODE_DATA_DIR_KEY));

View File

@ -0,0 +1,110 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY;
import static org.junit.Assert.*;
import java.io.File;
import java.util.Properties;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.minikdc.MiniKdc;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
import org.junit.AfterClass;
import org.junit.BeforeClass;
public abstract class SaslDataTransferTestCase {
private static File baseDir;
private static String hdfsPrincipal;
private static MiniKdc kdc;
private static String keytab;
private static String spnegoPrincipal;
@BeforeClass
public static void initKdc() throws Exception {
baseDir = new File(System.getProperty("test.build.dir", "target/test-dir"),
SaslDataTransferTestCase.class.getSimpleName());
FileUtil.fullyDelete(baseDir);
assertTrue(baseDir.mkdirs());
Properties kdcConf = MiniKdc.createConf();
kdc = new MiniKdc(kdcConf, baseDir);
kdc.start();
String userName = UserGroupInformation.getLoginUser().getShortUserName();
File keytabFile = new File(baseDir, userName + ".keytab");
keytab = keytabFile.getAbsolutePath();
kdc.createPrincipal(keytabFile, userName + "/localhost", "HTTP/localhost");
hdfsPrincipal = userName + "/localhost@" + kdc.getRealm();
spnegoPrincipal = "HTTP/localhost@" + kdc.getRealm();
}
@AfterClass
public static void shutdownKdc() {
if (kdc != null) {
kdc.stop();
}
FileUtil.fullyDelete(baseDir);
}
/**
* Creates configuration for starting a secure cluster.
*
* @param dataTransferProtection supported QOPs
* @return configuration for starting a secure cluster
* @throws Exception if there is any failure
*/
protected HdfsConfiguration createSecureConfig(
String dataTransferProtection) throws Exception {
HdfsConfiguration conf = new HdfsConfiguration();
SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS, conf);
conf.set(DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, hdfsPrincipal);
conf.set(DFS_NAMENODE_KEYTAB_FILE_KEY, keytab);
conf.set(DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, hdfsPrincipal);
conf.set(DFS_DATANODE_KEYTAB_FILE_KEY, keytab);
conf.set(DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, spnegoPrincipal);
conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
conf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, dataTransferProtection);
conf.set(DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name());
conf.set(DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0");
conf.set(DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0");
conf.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY, 10);
String keystoresDir = baseDir.getAbsolutePath();
String sslConfDir = KeyStoreTestUtil.getClasspathDir(this.getClass());
KeyStoreTestUtil.setupSSLConfig(keystoresDir, sslConfDir, conf, false);
return conf;
}
}

View File

@ -0,0 +1,155 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
import static org.junit.Assert.*;
import java.io.IOException;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.IOUtils;
import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
public class TestSaslDataTransfer extends SaslDataTransferTestCase {
private static final int BLOCK_SIZE = 4096;
private static final int BUFFER_SIZE= 1024;
private static final int NUM_BLOCKS = 3;
private static final Path PATH = new Path("/file1");
private static final short REPLICATION = 3;
private MiniDFSCluster cluster;
private FileSystem fs;
@Rule
public ExpectedException exception = ExpectedException.none();
@After
public void shutdown() {
IOUtils.cleanup(null, fs);
if (cluster != null) {
cluster.shutdown();
}
}
@Test
public void testAuthentication() throws Exception {
HdfsConfiguration clusterConf = createSecureConfig(
"authentication,integrity,privacy");
startCluster(clusterConf);
HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf);
clientConf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "authentication");
doTest(clientConf);
}
@Test
public void testIntegrity() throws Exception {
HdfsConfiguration clusterConf = createSecureConfig(
"authentication,integrity,privacy");
startCluster(clusterConf);
HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf);
clientConf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "integrity");
doTest(clientConf);
}
@Test
public void testPrivacy() throws Exception {
HdfsConfiguration clusterConf = createSecureConfig(
"authentication,integrity,privacy");
startCluster(clusterConf);
HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf);
clientConf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "privacy");
doTest(clientConf);
}
@Test
public void testClientAndServerDoNotHaveCommonQop() throws Exception {
HdfsConfiguration clusterConf = createSecureConfig("privacy");
startCluster(clusterConf);
HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf);
clientConf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "authentication");
exception.expect(IOException.class);
exception.expectMessage("could only be replicated to 0 nodes");
doTest(clientConf);
}
@Test
public void testClientSaslNoServerSasl() throws Exception {
HdfsConfiguration clusterConf = createSecureConfig("");
startCluster(clusterConf);
HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf);
clientConf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "authentication");
exception.expect(IOException.class);
exception.expectMessage("could only be replicated to 0 nodes");
doTest(clientConf);
}
@Test
public void testServerSaslNoClientSasl() throws Exception {
HdfsConfiguration clusterConf = createSecureConfig(
"authentication,integrity,privacy");
startCluster(clusterConf);
HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf);
clientConf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "");
exception.expect(IOException.class);
exception.expectMessage("could only be replicated to 0 nodes");
doTest(clientConf);
}
/**
* Tests DataTransferProtocol with the given client configuration.
*
* @param conf client configuration
* @throws IOException if there is an I/O error
*/
private void doTest(HdfsConfiguration conf) throws IOException {
fs = FileSystem.get(cluster.getURI(), conf);
FileSystemTestHelper.createFile(fs, PATH, NUM_BLOCKS, BLOCK_SIZE);
assertArrayEquals(FileSystemTestHelper.getFileData(NUM_BLOCKS, BLOCK_SIZE),
DFSTestUtil.readFile(fs, PATH).getBytes("UTF-8"));
BlockLocation[] blockLocations = fs.getFileBlockLocations(PATH, 0,
Long.MAX_VALUE);
assertNotNull(blockLocations);
assertEquals(NUM_BLOCKS, blockLocations.length);
for (BlockLocation blockLocation: blockLocations) {
assertNotNull(blockLocation.getHosts());
assertEquals(3, blockLocation.getHosts().length);
}
}
/**
* Starts a cluster with the given configuration.
*
* @param conf cluster configuration
* @throws IOException if there is an I/O error
*/
private void startCluster(HdfsConfiguration conf) throws IOException {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
cluster.waitActive();
}
}

View File

@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.balancer;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferTestCase;
import org.junit.Test;
public class TestBalancerWithSaslDataTransfer extends SaslDataTransferTestCase {
private static final TestBalancer TEST_BALANCER = new TestBalancer();
@Test
public void testBalancer0Authentication() throws Exception {
TEST_BALANCER.testBalancer0Internal(createSecureConfig("authentication"));
}
@Test
public void testBalancer0Integrity() throws Exception {
TEST_BALANCER.testBalancer0Internal(createSecureConfig("integrity"));
}
@Test
public void testBalancer0Privacy() throws Exception {
TEST_BALANCER.testBalancer0Internal(createSecureConfig("privacy"));
}
}

View File

@ -45,6 +45,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.RemotePeerFactory; import org.apache.hadoop.hdfs.RemotePeerFactory;
import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.net.TcpPeerServer; import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@ -160,7 +161,8 @@ public class TestBlockTokenWithDFS {
setConfiguration(conf). setConfiguration(conf).
setRemotePeerFactory(new RemotePeerFactory() { setRemotePeerFactory(new RemotePeerFactory() {
@Override @Override
public Peer newConnectedPeer(InetSocketAddress addr) public Peer newConnectedPeer(InetSocketAddress addr,
Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
throws IOException { throws IOException {
Peer peer = null; Peer peer = null;
Socket sock = NetUtils.getDefaultSocketFactory(conf).createSocket(); Socket sock = NetUtils.getDefaultSocketFactory(conf).createSocket();

View File

@ -46,9 +46,11 @@ import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.net.TcpPeerServer; import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
@ -58,6 +60,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -307,7 +310,8 @@ public class TestDataNodeVolumeFailure {
setConfiguration(conf). setConfiguration(conf).
setRemotePeerFactory(new RemotePeerFactory() { setRemotePeerFactory(new RemotePeerFactory() {
@Override @Override
public Peer newConnectedPeer(InetSocketAddress addr) public Peer newConnectedPeer(InetSocketAddress addr,
Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
throws IOException { throws IOException {
Peer peer = null; Peer peer = null;
Socket sock = NetUtils.getDefaultSocketFactory(conf).createSocket(); Socket sock = NetUtils.getDefaultSocketFactory(conf).createSocket();