diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 1866587de40..f2cc10b7046 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -34,6 +34,9 @@ Release 2.6.0 - UNRELEASED
HDFS-3851. DFSOutputStream class code cleanup. (Jing Zhao via suresh)
+ HDFS-2856. Fix block protocol so that Datanodes don't require root or jsvc.
+ (cnauroth)
+
OPTIMIZATIONS
BUG FIXES
diff --git a/hadoop-hdfs-project/hadoop-hdfs/pom.xml b/hadoop-hdfs-project/hadoop-hdfs/pom.xml
index 4c454e478af..d19ac38cee8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/pom.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/pom.xml
@@ -145,6 +145,11 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
junit
test
+
+ org.apache.hadoop
+ hadoop-minikdc
+ test
+
org.mockito
mockito-all
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
index 3355404c7a7..d27bd6ef0d2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
@@ -744,7 +744,8 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
}
}
try {
- Peer peer = remotePeerFactory.newConnectedPeer(inetSocketAddress);
+ Peer peer = remotePeerFactory.newConnectedPeer(inetSocketAddress, token,
+ datanode);
if (LOG.isTraceEnabled()) {
LOG.trace("nextTcpPeer: created newConnectedPeer " + peer);
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 14fbeef1794..9edb3db6585 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
@@ -93,7 +95,6 @@ import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.BlockStorageLocation;
@@ -136,6 +137,7 @@ import org.apache.hadoop.hdfs.protocol.CachePoolIterator;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -153,16 +155,19 @@ import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure;
import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
@@ -209,7 +214,8 @@ import com.google.common.net.InetAddresses;
*
********************************************************/
@InterfaceAudience.Private
-public class DFSClient implements java.io.Closeable, RemotePeerFactory {
+public class DFSClient implements java.io.Closeable, RemotePeerFactory,
+ DataEncryptionKeyFactory {
public static final Log LOG = LogFactory.getLog(DFSClient.class);
public static final long SERVER_DEFAULTS_VALIDITY_PERIOD = 60 * 60 * 1000L; // 1 hour
static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
@@ -233,7 +239,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
private final Random r = new Random();
private SocketAddress[] localInterfaceAddrs;
private DataEncryptionKey encryptionKey;
- final TrustedChannelResolver trustedChannelResolver;
+ final SaslDataTransferClient saslClient;
private final CachingStrategy defaultReadCachingStrategy;
private final CachingStrategy defaultWriteCachingStrategy;
private final ClientContext clientContext;
@@ -635,7 +641,12 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
if (numThreads > 0) {
this.initThreadsNumForHedgedReads(numThreads);
}
- this.trustedChannelResolver = TrustedChannelResolver.getInstance(getConfiguration());
+ this.saslClient = new SaslDataTransferClient(
+ DataTransferSaslUtil.getSaslPropertiesResolver(conf),
+ TrustedChannelResolver.getInstance(conf),
+ conf.getBoolean(
+ IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
+ IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT));
}
/**
@@ -1805,23 +1816,6 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
UnresolvedPathException.class);
}
}
-
- /**
- * Get the checksum of the whole file of a range of the file. Note that the
- * range always starts from the beginning of the file.
- * @param src The file path
- * @param length The length of the range
- * @return The checksum
- * @see DistributedFileSystem#getFileChecksum(Path)
- */
- public MD5MD5CRC32FileChecksum getFileChecksum(String src, long length)
- throws IOException {
- checkOpen();
- Preconditions.checkArgument(length >= 0);
- return getFileChecksum(src, length, clientName, namenode,
- socketFactory, dfsClientConf.socketTimeout, getDataEncryptionKey(),
- dfsClientConf.connectToDnViaHostname);
- }
@InterfaceAudience.Private
public void clearDataEncryptionKey() {
@@ -1841,11 +1835,9 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
return d == null ? false : d.getEncryptDataTransfer();
}
- @InterfaceAudience.Private
- public DataEncryptionKey getDataEncryptionKey()
- throws IOException {
- if (shouldEncryptData() &&
- !this.trustedChannelResolver.isTrusted()) {
+ @Override
+ public DataEncryptionKey newDataEncryptionKey() throws IOException {
+ if (shouldEncryptData()) {
synchronized (this) {
if (encryptionKey == null ||
encryptionKey.expiryDate < Time.now()) {
@@ -1860,22 +1852,17 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
}
/**
- * Get the checksum of the whole file or a range of the file.
+ * Get the checksum of the whole file of a range of the file. Note that the
+ * range always starts from the beginning of the file.
* @param src The file path
* @param length the length of the range, i.e., the range is [0, length]
- * @param clientName the name of the client requesting the checksum.
- * @param namenode the RPC proxy for the namenode
- * @param socketFactory to create sockets to connect to DNs
- * @param socketTimeout timeout to use when connecting and waiting for a response
- * @param encryptionKey the key needed to communicate with DNs in this cluster
- * @param connectToDnViaHostname whether the client should use hostnames instead of IPs
* @return The checksum
+ * @see DistributedFileSystem#getFileChecksum(Path)
*/
- private static MD5MD5CRC32FileChecksum getFileChecksum(String src,
- long length, String clientName, ClientProtocol namenode,
- SocketFactory socketFactory, int socketTimeout,
- DataEncryptionKey encryptionKey, boolean connectToDnViaHostname)
+ public MD5MD5CRC32FileChecksum getFileChecksum(String src, long length)
throws IOException {
+ checkOpen();
+ Preconditions.checkArgument(length >= 0);
//get block locations for the file range
LocatedBlocks blockLocations = callGetBlockLocations(namenode, src, 0,
length);
@@ -1910,7 +1897,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
final DatanodeInfo[] datanodes = lb.getLocations();
//try each datanode location of the block
- final int timeout = 3000 * datanodes.length + socketTimeout;
+ final int timeout = 3000 * datanodes.length + dfsClientConf.socketTimeout;
boolean done = false;
for(int j = 0; !done && j < datanodes.length; j++) {
DataOutputStream out = null;
@@ -1918,8 +1905,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
try {
//connect to a datanode
- IOStreamPair pair = connectToDN(socketFactory, connectToDnViaHostname,
- encryptionKey, datanodes[j], timeout);
+ IOStreamPair pair = connectToDN(datanodes[j], timeout, lb);
out = new DataOutputStream(new BufferedOutputStream(pair.out,
HdfsConstants.SMALL_BUFFER_SIZE));
in = new DataInputStream(pair.in);
@@ -1975,9 +1961,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
} else {
LOG.debug("Retrieving checksum from an earlier-version DataNode: " +
"inferring checksum by reading first byte");
- ct = inferChecksumTypeByReading(
- clientName, socketFactory, socketTimeout, lb, datanodes[j],
- encryptionKey, connectToDnViaHostname);
+ ct = inferChecksumTypeByReading(lb, datanodes[j]);
}
if (i == 0) { // first block
@@ -2051,16 +2035,13 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
* Connect to the given datanode's datantrasfer port, and return
* the resulting IOStreamPair. This includes encryption wrapping, etc.
*/
- private static IOStreamPair connectToDN(
- SocketFactory socketFactory, boolean connectToDnViaHostname,
- DataEncryptionKey encryptionKey, DatanodeInfo dn, int timeout)
- throws IOException
- {
+ private IOStreamPair connectToDN(DatanodeInfo dn, int timeout,
+ LocatedBlock lb) throws IOException {
boolean success = false;
Socket sock = null;
try {
sock = socketFactory.createSocket();
- String dnAddr = dn.getXferAddr(connectToDnViaHostname);
+ String dnAddr = dn.getXferAddr(getConf().connectToDnViaHostname);
if (LOG.isDebugEnabled()) {
LOG.debug("Connecting to datanode " + dnAddr);
}
@@ -2069,13 +2050,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
OutputStream unbufOut = NetUtils.getOutputStream(sock);
InputStream unbufIn = NetUtils.getInputStream(sock);
- IOStreamPair ret;
- if (encryptionKey != null) {
- ret = DataTransferEncryptor.getEncryptedStreams(
- unbufOut, unbufIn, encryptionKey);
- } else {
- ret = new IOStreamPair(unbufIn, unbufOut);
- }
+ IOStreamPair ret = saslClient.newSocketSend(sock, unbufOut, unbufIn, this,
+ lb.getBlockToken(), dn);
success = true;
return ret;
} finally {
@@ -2091,21 +2067,14 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
* with older HDFS versions which did not include the checksum type in
* OpBlockChecksumResponseProto.
*
- * @param in input stream from datanode
- * @param out output stream to datanode
* @param lb the located block
- * @param clientName the name of the DFSClient requesting the checksum
* @param dn the connected datanode
* @return the inferred checksum type
* @throws IOException if an error occurs
*/
- private static Type inferChecksumTypeByReading(
- String clientName, SocketFactory socketFactory, int socketTimeout,
- LocatedBlock lb, DatanodeInfo dn,
- DataEncryptionKey encryptionKey, boolean connectToDnViaHostname)
+ private Type inferChecksumTypeByReading(LocatedBlock lb, DatanodeInfo dn)
throws IOException {
- IOStreamPair pair = connectToDN(socketFactory, connectToDnViaHostname,
- encryptionKey, dn, socketTimeout);
+ IOStreamPair pair = connectToDN(dn, dfsClientConf.socketTimeout, lb);
try {
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(pair.out,
@@ -2858,7 +2827,9 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
}
@Override // RemotePeerFactory
- public Peer newConnectedPeer(InetSocketAddress addr) throws IOException {
+ public Peer newConnectedPeer(InetSocketAddress addr,
+ Token blockToken, DatanodeID datanodeId)
+ throws IOException {
Peer peer = null;
boolean success = false;
Socket sock = null;
@@ -2867,8 +2838,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
NetUtils.connect(sock, addr,
getRandomLocalInterfaceAddr(),
dfsClientConf.socketTimeout);
- peer = TcpPeerServer.peerFromSocketAndKey(sock,
- getDataEncryptionKey());
+ peer = TcpPeerServer.peerFromSocketAndKey(saslClient, sock, this,
+ blockToken, datanodeId);
success = true;
return peer;
} finally {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index d41239c5639..aea6b72acef 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -565,6 +565,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final boolean DFS_ENCRYPT_DATA_TRANSFER_DEFAULT = false;
public static final String DFS_DATA_ENCRYPTION_ALGORITHM_KEY = "dfs.encrypt.data.transfer.algorithm";
public static final String DFS_TRUSTEDCHANNEL_RESOLVER_CLASS = "dfs.trustedchannel.resolver.class";
+ public static final String DFS_DATA_TRANSFER_PROTECTION_KEY = "dfs.data.transfer.protection";
+ public static final String DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY = "dfs.data.transfer.saslproperties.resolver.class";
// Journal-node related configs. These are read on the JN side.
public static final String DFS_JOURNALNODE_EDITS_DIR_KEY = "dfs.journalnode.edits.dir";
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index 920ef9f4048..9d1acf668ce 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -61,7 +61,6 @@ import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
@@ -1047,14 +1046,10 @@ public class DFSOutputStream extends FSOutputSummer
OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
InputStream unbufIn = NetUtils.getInputStream(sock);
- if (dfsClient.shouldEncryptData() &&
- !dfsClient.trustedChannelResolver.isTrusted(sock.getInetAddress())) {
- IOStreamPair encryptedStreams =
- DataTransferEncryptor.getEncryptedStreams(
- unbufOut, unbufIn, dfsClient.getDataEncryptionKey());
- unbufOut = encryptedStreams.out;
- unbufIn = encryptedStreams.in;
- }
+ IOStreamPair saslStreams = dfsClient.saslClient.socketSend(sock,
+ unbufOut, unbufIn, dfsClient, blockToken, src);
+ unbufOut = saslStreams.out;
+ unbufIn = saslStreams.in;
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
HdfsConstants.SMALL_BUFFER_SIZE));
in = new DataInputStream(unbufIn);
@@ -1325,14 +1320,10 @@ public class DFSOutputStream extends FSOutputSummer
OutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout);
InputStream unbufIn = NetUtils.getInputStream(s);
- if (dfsClient.shouldEncryptData() &&
- !dfsClient.trustedChannelResolver.isTrusted(s.getInetAddress())) {
- IOStreamPair encryptedStreams =
- DataTransferEncryptor.getEncryptedStreams(unbufOut,
- unbufIn, dfsClient.getDataEncryptionKey());
- unbufOut = encryptedStreams.out;
- unbufIn = encryptedStreams.in;
- }
+ IOStreamPair saslStreams = dfsClient.saslClient.socketSend(s,
+ unbufOut, unbufIn, dfsClient, accessToken, nodes[0]);
+ unbufOut = saslStreams.out;
+ unbufIn = saslStreams.in;
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
HdfsConstants.SMALL_BUFFER_SIZE));
blockReplyStream = new DataInputStream(unbufIn);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java
index 17cdb3060c4..5afff0081b8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java
@@ -21,15 +21,21 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
public interface RemotePeerFactory {
/**
* @param addr The address to connect to.
- *
+ * @param blockToken Token used during optional SASL negotiation
+ * @param datanodeId ID of destination DataNode
* @return A new Peer connected to the address.
*
* @throws IOException If there was an error connecting or creating
* the remote socket, encrypted stream, etc.
*/
- Peer newConnectedPeer(InetSocketAddress addr) throws IOException;
+ Peer newConnectedPeer(InetSocketAddress addr,
+ Token blockToken, DatanodeID datanodeId)
+ throws IOException;
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java
index e13213dc663..da660c7d464 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java
@@ -19,9 +19,7 @@ package org.apache.hadoop.hdfs.net;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
-import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.net.unix.DomainSocket;
import java.io.InputStream;
@@ -51,11 +49,8 @@ public class EncryptedPeer implements Peer {
*/
private final ReadableByteChannel channel;
- public EncryptedPeer(Peer enclosedPeer, DataEncryptionKey key)
- throws IOException {
+ public EncryptedPeer(Peer enclosedPeer, IOStreamPair ios) {
this.enclosedPeer = enclosedPeer;
- IOStreamPair ios = DataTransferEncryptor.getEncryptedStreams(
- enclosedPeer.getOutputStream(), enclosedPeer.getInputStream(), key);
this.in = ios.in;
this.out = ios.out;
this.channel = ios.in instanceof ReadableByteChannel ?
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java
index ddef592ec7c..2a547e0dcf4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java
@@ -28,10 +28,14 @@ import java.nio.channels.SocketChannel;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.security.token.Token;
@InterfaceAudience.Private
public class TcpPeerServer implements PeerServer {
@@ -74,15 +78,16 @@ public class TcpPeerServer implements PeerServer {
}
}
- public static Peer peerFromSocketAndKey(Socket s,
- DataEncryptionKey key) throws IOException {
+ public static Peer peerFromSocketAndKey(
+ SaslDataTransferClient saslClient, Socket s,
+ DataEncryptionKeyFactory keyFactory,
+ Token blockToken, DatanodeID datanodeId)
+ throws IOException {
Peer peer = null;
boolean success = false;
try {
- peer = peerFromSocket(s);
- if (key != null) {
- peer = new EncryptedPeer(peer, key);
- }
+ peer = peerFromSocket(s);
+ peer = saslClient.peerSend(peer, keyFactory, blockToken, datanodeId);
success = true;
return peer;
} finally {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferEncryptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferEncryptor.java
deleted file mode 100644
index e069a391b8b..00000000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferEncryptor.java
+++ /dev/null
@@ -1,506 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.protocol.datatransfer;
-
-import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Map;
-import java.util.TreeMap;
-
-import javax.security.auth.callback.Callback;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.callback.NameCallback;
-import javax.security.auth.callback.PasswordCallback;
-import javax.security.auth.callback.UnsupportedCallbackException;
-import javax.security.sasl.AuthorizeCallback;
-import javax.security.sasl.RealmCallback;
-import javax.security.sasl.RealmChoiceCallback;
-import javax.security.sasl.Sasl;
-import javax.security.sasl.SaslClient;
-import javax.security.sasl.SaslException;
-import javax.security.sasl.SaslServer;
-
-import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
-import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
-import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
-import org.apache.hadoop.security.SaslInputStream;
-import org.apache.hadoop.security.SaslOutputStream;
-
-import com.google.common.base.Charsets;
-import com.google.common.collect.Maps;
-import com.google.protobuf.ByteString;
-
-/**
- * A class which, given connected input/output streams, will perform a
- * handshake using those streams based on SASL to produce new Input/Output
- * streams which will encrypt/decrypt all data written/read from said streams.
- * Much of this is inspired by or borrowed from the TSaslTransport in Apache
- * Thrift, but with some HDFS-specific tweaks.
- */
-@InterfaceAudience.Private
-public class DataTransferEncryptor {
-
- public static final Log LOG = LogFactory.getLog(DataTransferEncryptor.class);
-
- /**
- * Sent by clients and validated by servers. We use a number that's unlikely
- * to ever be sent as the value of the DATA_TRANSFER_VERSION.
- */
- private static final int ENCRYPTED_TRANSFER_MAGIC_NUMBER = 0xDEADBEEF;
-
- /**
- * Delimiter for the three-part SASL username string.
- */
- private static final String NAME_DELIMITER = " ";
-
- // This has to be set as part of the SASL spec, but it don't matter for
- // our purposes, but may not be empty. It's sent over the wire, so use
- // a short string.
- private static final String SERVER_NAME = "0";
-
- private static final String PROTOCOL = "hdfs";
- private static final String MECHANISM = "DIGEST-MD5";
- private static final Map SASL_PROPS = new TreeMap();
-
- static {
- SASL_PROPS.put(Sasl.QOP, "auth-conf");
- SASL_PROPS.put(Sasl.SERVER_AUTH, "true");
- }
-
- /**
- * Factory method for DNs, where the nonce, keyId, and encryption key are not
- * yet known. The nonce and keyId will be sent by the client, and the DN
- * will then use those pieces of info and the secret key shared with the NN
- * to determine the encryptionKey used for the SASL handshake/encryption.
- *
- * Establishes a secure connection assuming that the party on the other end
- * has the same shared secret. This does a SASL connection handshake, but not
- * a general-purpose one. It's specific to the MD5-DIGEST SASL mechanism with
- * auth-conf enabled. In particular, it doesn't support an arbitrary number of
- * challenge/response rounds, and we know that the client will never have an
- * initial response, so we don't check for one.
- *
- * @param underlyingOut output stream to write to the other party
- * @param underlyingIn input stream to read from the other party
- * @param blockPoolTokenSecretManager secret manager capable of constructing
- * encryption key based on keyId, blockPoolId, and nonce
- * @return a pair of streams which wrap the given streams and encrypt/decrypt
- * all data read/written
- * @throws IOException in the event of error
- */
- public static IOStreamPair getEncryptedStreams(
- OutputStream underlyingOut, InputStream underlyingIn,
- BlockPoolTokenSecretManager blockPoolTokenSecretManager,
- String encryptionAlgorithm) throws IOException {
-
- DataInputStream in = new DataInputStream(underlyingIn);
- DataOutputStream out = new DataOutputStream(underlyingOut);
-
- Map saslProps = Maps.newHashMap(SASL_PROPS);
- saslProps.put("com.sun.security.sasl.digest.cipher", encryptionAlgorithm);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Server using encryption algorithm " + encryptionAlgorithm);
- }
-
- SaslParticipant sasl = new SaslParticipant(Sasl.createSaslServer(MECHANISM,
- PROTOCOL, SERVER_NAME, saslProps,
- new SaslServerCallbackHandler(blockPoolTokenSecretManager)));
-
- int magicNumber = in.readInt();
- if (magicNumber != ENCRYPTED_TRANSFER_MAGIC_NUMBER) {
- throw new InvalidMagicNumberException(magicNumber);
- }
- try {
- // step 1
- performSaslStep1(out, in, sasl);
-
- // step 2 (server-side only)
- byte[] remoteResponse = readSaslMessage(in);
- byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
- sendSaslMessage(out, localResponse);
-
- // SASL handshake is complete
- checkSaslComplete(sasl);
-
- return sasl.createEncryptedStreamPair(out, in);
- } catch (IOException ioe) {
- if (ioe instanceof SaslException &&
- ioe.getCause() != null &&
- ioe.getCause() instanceof InvalidEncryptionKeyException) {
- // This could just be because the client is long-lived and hasn't gotten
- // a new encryption key from the NN in a while. Upon receiving this
- // error, the client will get a new encryption key from the NN and retry
- // connecting to this DN.
- sendInvalidKeySaslErrorMessage(out, ioe.getCause().getMessage());
- } else {
- sendGenericSaslErrorMessage(out, ioe.getMessage());
- }
- throw ioe;
- }
- }
-
- /**
- * Factory method for clients, where the encryption token is already created.
- *
- * Establishes a secure connection assuming that the party on the other end
- * has the same shared secret. This does a SASL connection handshake, but not
- * a general-purpose one. It's specific to the MD5-DIGEST SASL mechanism with
- * auth-conf enabled. In particular, it doesn't support an arbitrary number of
- * challenge/response rounds, and we know that the client will never have an
- * initial response, so we don't check for one.
- *
- * @param underlyingOut output stream to write to the other party
- * @param underlyingIn input stream to read from the other party
- * @param encryptionKey all info required to establish an encrypted stream
- * @return a pair of streams which wrap the given streams and encrypt/decrypt
- * all data read/written
- * @throws IOException in the event of error
- */
- public static IOStreamPair getEncryptedStreams(
- OutputStream underlyingOut, InputStream underlyingIn,
- DataEncryptionKey encryptionKey)
- throws IOException {
-
- Map saslProps = Maps.newHashMap(SASL_PROPS);
- saslProps.put("com.sun.security.sasl.digest.cipher",
- encryptionKey.encryptionAlgorithm);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Client using encryption algorithm " +
- encryptionKey.encryptionAlgorithm);
- }
-
- DataOutputStream out = new DataOutputStream(underlyingOut);
- DataInputStream in = new DataInputStream(underlyingIn);
-
- String userName = getUserNameFromEncryptionKey(encryptionKey);
- SaslParticipant sasl = new SaslParticipant(Sasl.createSaslClient(
- new String[] { MECHANISM }, userName, PROTOCOL, SERVER_NAME, saslProps,
- new SaslClientCallbackHandler(encryptionKey.encryptionKey, userName)));
-
- out.writeInt(ENCRYPTED_TRANSFER_MAGIC_NUMBER);
- out.flush();
-
- try {
- // Start of handshake - "initial response" in SASL terminology.
- sendSaslMessage(out, new byte[0]);
-
- // step 1
- performSaslStep1(out, in, sasl);
-
- // step 2 (client-side only)
- byte[] remoteResponse = readSaslMessage(in);
- byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
- assert localResponse == null;
-
- // SASL handshake is complete
- checkSaslComplete(sasl);
-
- return sasl.createEncryptedStreamPair(out, in);
- } catch (IOException ioe) {
- sendGenericSaslErrorMessage(out, ioe.getMessage());
- throw ioe;
- }
- }
-
- private static void performSaslStep1(DataOutputStream out, DataInputStream in,
- SaslParticipant sasl) throws IOException {
- byte[] remoteResponse = readSaslMessage(in);
- byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
- sendSaslMessage(out, localResponse);
- }
-
- private static void checkSaslComplete(SaslParticipant sasl) throws IOException {
- if (!sasl.isComplete()) {
- throw new IOException("Failed to complete SASL handshake");
- }
-
- if (!sasl.supportsConfidentiality()) {
- throw new IOException("SASL handshake completed, but channel does not " +
- "support encryption");
- }
- }
-
- private static void sendSaslMessage(DataOutputStream out, byte[] payload)
- throws IOException {
- sendSaslMessage(out, DataTransferEncryptorStatus.SUCCESS, payload, null);
- }
-
- private static void sendInvalidKeySaslErrorMessage(DataOutputStream out,
- String message) throws IOException {
- sendSaslMessage(out, DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY, null,
- message);
- }
-
- private static void sendGenericSaslErrorMessage(DataOutputStream out,
- String message) throws IOException {
- sendSaslMessage(out, DataTransferEncryptorStatus.ERROR, null, message);
- }
-
- private static void sendSaslMessage(OutputStream out,
- DataTransferEncryptorStatus status, byte[] payload, String message)
- throws IOException {
- DataTransferEncryptorMessageProto.Builder builder =
- DataTransferEncryptorMessageProto.newBuilder();
-
- builder.setStatus(status);
- if (payload != null) {
- builder.setPayload(ByteString.copyFrom(payload));
- }
- if (message != null) {
- builder.setMessage(message);
- }
-
- DataTransferEncryptorMessageProto proto = builder.build();
- proto.writeDelimitedTo(out);
- out.flush();
- }
-
- private static byte[] readSaslMessage(DataInputStream in) throws IOException {
- DataTransferEncryptorMessageProto proto =
- DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in));
- if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) {
- throw new InvalidEncryptionKeyException(proto.getMessage());
- } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) {
- throw new IOException(proto.getMessage());
- } else {
- return proto.getPayload().toByteArray();
- }
- }
-
- /**
- * Set the encryption key when asked by the server-side SASL object.
- */
- private static class SaslServerCallbackHandler implements CallbackHandler {
-
- private final BlockPoolTokenSecretManager blockPoolTokenSecretManager;
-
- public SaslServerCallbackHandler(BlockPoolTokenSecretManager
- blockPoolTokenSecretManager) {
- this.blockPoolTokenSecretManager = blockPoolTokenSecretManager;
- }
-
- @Override
- public void handle(Callback[] callbacks) throws IOException,
- UnsupportedCallbackException {
- NameCallback nc = null;
- PasswordCallback pc = null;
- AuthorizeCallback ac = null;
- for (Callback callback : callbacks) {
- if (callback instanceof AuthorizeCallback) {
- ac = (AuthorizeCallback) callback;
- } else if (callback instanceof PasswordCallback) {
- pc = (PasswordCallback) callback;
- } else if (callback instanceof NameCallback) {
- nc = (NameCallback) callback;
- } else if (callback instanceof RealmCallback) {
- continue; // realm is ignored
- } else {
- throw new UnsupportedCallbackException(callback,
- "Unrecognized SASL DIGEST-MD5 Callback: " + callback);
- }
- }
-
- if (pc != null) {
- byte[] encryptionKey = getEncryptionKeyFromUserName(
- blockPoolTokenSecretManager, nc.getDefaultName());
- pc.setPassword(encryptionKeyToPassword(encryptionKey));
- }
-
- if (ac != null) {
- ac.setAuthorized(true);
- ac.setAuthorizedID(ac.getAuthorizationID());
- }
-
- }
-
- }
-
- /**
- * Set the encryption key when asked by the client-side SASL object.
- */
- private static class SaslClientCallbackHandler implements CallbackHandler {
-
- private final byte[] encryptionKey;
- private final String userName;
-
- public SaslClientCallbackHandler(byte[] encryptionKey, String userName) {
- this.encryptionKey = encryptionKey;
- this.userName = userName;
- }
-
- @Override
- public void handle(Callback[] callbacks) throws IOException,
- UnsupportedCallbackException {
- NameCallback nc = null;
- PasswordCallback pc = null;
- RealmCallback rc = null;
- for (Callback callback : callbacks) {
- if (callback instanceof RealmChoiceCallback) {
- continue;
- } else if (callback instanceof NameCallback) {
- nc = (NameCallback) callback;
- } else if (callback instanceof PasswordCallback) {
- pc = (PasswordCallback) callback;
- } else if (callback instanceof RealmCallback) {
- rc = (RealmCallback) callback;
- } else {
- throw new UnsupportedCallbackException(callback,
- "Unrecognized SASL client callback");
- }
- }
- if (nc != null) {
- nc.setName(userName);
- }
- if (pc != null) {
- pc.setPassword(encryptionKeyToPassword(encryptionKey));
- }
- if (rc != null) {
- rc.setText(rc.getDefaultText());
- }
- }
-
- }
-
- /**
- * The SASL username consists of the keyId, blockPoolId, and nonce with the
- * first two encoded as Strings, and the third encoded using Base64. The
- * fields are each separated by a single space.
- *
- * @param encryptionKey the encryption key to encode as a SASL username.
- * @return encoded username containing keyId, blockPoolId, and nonce
- */
- private static String getUserNameFromEncryptionKey(
- DataEncryptionKey encryptionKey) {
- return encryptionKey.keyId + NAME_DELIMITER +
- encryptionKey.blockPoolId + NAME_DELIMITER +
- new String(Base64.encodeBase64(encryptionKey.nonce, false), Charsets.UTF_8);
- }
-
- /**
- * Given a secret manager and a username encoded as described above, determine
- * the encryption key.
- *
- * @param blockPoolTokenSecretManager to determine the encryption key.
- * @param userName containing the keyId, blockPoolId, and nonce.
- * @return secret encryption key.
- * @throws IOException
- */
- private static byte[] getEncryptionKeyFromUserName(
- BlockPoolTokenSecretManager blockPoolTokenSecretManager, String userName)
- throws IOException {
- String[] nameComponents = userName.split(NAME_DELIMITER);
- if (nameComponents.length != 3) {
- throw new IOException("Provided name '" + userName + "' has " +
- nameComponents.length + " components instead of the expected 3.");
- }
- int keyId = Integer.parseInt(nameComponents[0]);
- String blockPoolId = nameComponents[1];
- byte[] nonce = Base64.decodeBase64(nameComponents[2]);
- return blockPoolTokenSecretManager.retrieveDataEncryptionKey(keyId,
- blockPoolId, nonce);
- }
-
- private static char[] encryptionKeyToPassword(byte[] encryptionKey) {
- return new String(Base64.encodeBase64(encryptionKey, false), Charsets.UTF_8).toCharArray();
- }
-
- /**
- * Strongly inspired by Thrift's TSaslTransport class.
- *
- * Used to abstract over the SaslServer
and
- * SaslClient
classes, which share a lot of their interface, but
- * unfortunately don't share a common superclass.
- */
- private static class SaslParticipant {
- // One of these will always be null.
- public SaslServer saslServer;
- public SaslClient saslClient;
-
- public SaslParticipant(SaslServer saslServer) {
- this.saslServer = saslServer;
- }
-
- public SaslParticipant(SaslClient saslClient) {
- this.saslClient = saslClient;
- }
-
- public byte[] evaluateChallengeOrResponse(byte[] challengeOrResponse) throws SaslException {
- if (saslClient != null) {
- return saslClient.evaluateChallenge(challengeOrResponse);
- } else {
- return saslServer.evaluateResponse(challengeOrResponse);
- }
- }
-
- public boolean isComplete() {
- if (saslClient != null)
- return saslClient.isComplete();
- else
- return saslServer.isComplete();
- }
-
- public boolean supportsConfidentiality() {
- String qop = null;
- if (saslClient != null) {
- qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP);
- } else {
- qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
- }
- return qop != null && qop.equals("auth-conf");
- }
-
- // Return some input/output streams that will henceforth have their
- // communication encrypted.
- private IOStreamPair createEncryptedStreamPair(
- DataOutputStream out, DataInputStream in) {
- if (saslClient != null) {
- return new IOStreamPair(
- new SaslInputStream(in, saslClient),
- new SaslOutputStream(out, saslClient));
- } else {
- return new IOStreamPair(
- new SaslInputStream(in, saslServer),
- new SaslOutputStream(out, saslServer));
- }
- }
- }
-
- @InterfaceAudience.Private
- public static class InvalidMagicNumberException extends IOException {
-
- private static final long serialVersionUID = 1L;
-
- public InvalidMagicNumberException(int magicNumber) {
- super(String.format("Received %x instead of %x from client.",
- magicNumber, ENCRYPTED_TRANSFER_MAGIC_NUMBER));
- }
- }
-
-}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataEncryptionKeyFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataEncryptionKeyFactory.java
new file mode 100644
index 00000000000..959cba0fb48
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataEncryptionKeyFactory.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+
+/**
+ * Creates a new {@link DataEncryptionKey} on demand.
+ */
+@InterfaceAudience.Private
+public interface DataEncryptionKeyFactory {
+
+ /**
+ * Creates a new DataEncryptionKey.
+ *
+ * @return DataEncryptionKey newly created
+ * @throws IOException for any error
+ */
+ DataEncryptionKey newDataEncryptionKey() throws IOException;
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java
new file mode 100644
index 00000000000..cd18b9fa0fa
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY;
+import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Set;
+import javax.security.sasl.Sasl;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
+import org.apache.hadoop.security.SaslPropertiesResolver;
+import org.apache.hadoop.security.SaslRpcServer.QualityOfProtection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import com.google.common.net.InetAddresses;
+import com.google.protobuf.ByteString;
+
+/**
+ * Utility methods implementing SASL negotiation for DataTransferProtocol.
+ */
+@InterfaceAudience.Private
+public final class DataTransferSaslUtil {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ DataTransferSaslUtil.class);
+
+ /**
+ * Delimiter for the three-part SASL username string.
+ */
+ public static final String NAME_DELIMITER = " ";
+
+ /**
+ * Sent by clients and validated by servers. We use a number that's unlikely
+ * to ever be sent as the value of the DATA_TRANSFER_VERSION.
+ */
+ public static final int SASL_TRANSFER_MAGIC_NUMBER = 0xDEADBEEF;
+
+ /**
+ * Checks that SASL negotiation has completed for the given participant, and
+ * the negotiated quality of protection is included in the given SASL
+ * properties and therefore acceptable.
+ *
+ * @param sasl participant to check
+ * @param saslProps properties of SASL negotiation
+ * @throws IOException for any error
+ */
+ public static void checkSaslComplete(SaslParticipant sasl,
+ Map saslProps) throws IOException {
+ if (!sasl.isComplete()) {
+ throw new IOException("Failed to complete SASL handshake");
+ }
+ Set requestedQop = ImmutableSet.copyOf(Arrays.asList(
+ saslProps.get(Sasl.QOP).split(",")));
+ String negotiatedQop = sasl.getNegotiatedQop();
+ LOG.debug("Verifying QOP, requested QOP = {}, negotiated QOP = {}",
+ requestedQop, negotiatedQop);
+ if (!requestedQop.contains(negotiatedQop)) {
+ throw new IOException(String.format("SASL handshake completed, but " +
+ "channel does not have acceptable quality of protection, " +
+ "requested = %s, negotiated = %s", requestedQop, negotiatedQop));
+ }
+ }
+
+ /**
+ * Creates SASL properties required for an encrypted SASL negotiation.
+ *
+ * @param encryptionAlgorithm to use for SASL negotation
+ * @return properties of encrypted SASL negotiation
+ */
+ public static Map createSaslPropertiesForEncryption(
+ String encryptionAlgorithm) {
+ Map saslProps = Maps.newHashMapWithExpectedSize(3);
+ saslProps.put(Sasl.QOP, QualityOfProtection.PRIVACY.getSaslQop());
+ saslProps.put(Sasl.SERVER_AUTH, "true");
+ saslProps.put("com.sun.security.sasl.digest.cipher", encryptionAlgorithm);
+ return saslProps;
+ }
+
+ /**
+ * For an encrypted SASL negotiation, encodes an encryption key to a SASL
+ * password.
+ *
+ * @param encryptionKey to encode
+ * @return key encoded as SASL password
+ */
+ public static char[] encryptionKeyToPassword(byte[] encryptionKey) {
+ return new String(Base64.encodeBase64(encryptionKey, false), Charsets.UTF_8)
+ .toCharArray();
+ }
+
+ /**
+ * Returns InetAddress from peer. The getRemoteAddressString has the form
+ * [host][/ip-address]:port. The host may be missing. The IP address (and
+ * preceding '/') may be missing. The port preceded by ':' is always present.
+ *
+ * @param peer
+ * @return InetAddress from peer
+ */
+ public static InetAddress getPeerAddress(Peer peer) {
+ String remoteAddr = peer.getRemoteAddressString().split(":")[0];
+ int slashIdx = remoteAddr.indexOf('/');
+ return InetAddresses.forString(slashIdx != -1 ?
+ remoteAddr.substring(slashIdx + 1, remoteAddr.length()) :
+ remoteAddr);
+ }
+
+ /**
+ * Creates a SaslPropertiesResolver from the given configuration. This method
+ * works by cloning the configuration, translating configuration properties
+ * specific to DataTransferProtocol to what SaslPropertiesResolver expects,
+ * and then delegating to SaslPropertiesResolver for initialization. This
+ * method returns null if SASL protection has not been configured for
+ * DataTransferProtocol.
+ *
+ * @param conf configuration to read
+ * @return SaslPropertiesResolver for DataTransferProtocol, or null if not
+ * configured
+ */
+ public static SaslPropertiesResolver getSaslPropertiesResolver(
+ Configuration conf) {
+ String qops = conf.get(DFS_DATA_TRANSFER_PROTECTION_KEY);
+ if (qops == null || qops.isEmpty()) {
+ LOG.debug("DataTransferProtocol not using SaslPropertiesResolver, no " +
+ "QOP found in configuration for {}", DFS_DATA_TRANSFER_PROTECTION_KEY);
+ return null;
+ }
+ Configuration saslPropsResolverConf = new Configuration(conf);
+ saslPropsResolverConf.set(HADOOP_RPC_PROTECTION, qops);
+ Class extends SaslPropertiesResolver> resolverClass = conf.getClass(
+ DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY,
+ SaslPropertiesResolver.class, SaslPropertiesResolver.class);
+ saslPropsResolverConf.setClass(HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS,
+ resolverClass, SaslPropertiesResolver.class);
+ SaslPropertiesResolver resolver = SaslPropertiesResolver.getInstance(
+ saslPropsResolverConf);
+ LOG.debug("DataTransferProtocol using SaslPropertiesResolver, configured " +
+ "QOP {} = {}, configured class {} = {}", DFS_DATA_TRANSFER_PROTECTION_KEY, qops,
+ DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY, resolverClass);
+ return resolver;
+ }
+
+ /**
+ * Performs the first step of SASL negotiation.
+ *
+ * @param out connection output stream
+ * @param in connection input stream
+ * @param sasl participant
+ */
+ public static void performSaslStep1(OutputStream out, InputStream in,
+ SaslParticipant sasl) throws IOException {
+ byte[] remoteResponse = readSaslMessage(in);
+ byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
+ sendSaslMessage(out, localResponse);
+ }
+
+ /**
+ * Reads a SASL negotiation message.
+ *
+ * @param in stream to read
+ * @return bytes of SASL negotiation messsage
+ * @throws IOException for any error
+ */
+ public static byte[] readSaslMessage(InputStream in) throws IOException {
+ DataTransferEncryptorMessageProto proto =
+ DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in));
+ if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) {
+ throw new InvalidEncryptionKeyException(proto.getMessage());
+ } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) {
+ throw new IOException(proto.getMessage());
+ } else {
+ return proto.getPayload().toByteArray();
+ }
+ }
+
+ /**
+ * Sends a SASL negotiation message indicating an error.
+ *
+ * @param out stream to receive message
+ * @param message to send
+ * @throws IOException for any error
+ */
+ public static void sendGenericSaslErrorMessage(OutputStream out,
+ String message) throws IOException {
+ sendSaslMessage(out, DataTransferEncryptorStatus.ERROR, null, message);
+ }
+
+ /**
+ * Sends a SASL negotiation message.
+ *
+ * @param out stream to receive message
+ * @param payload to send
+ * @throws IOException for any error
+ */
+ public static void sendSaslMessage(OutputStream out, byte[] payload)
+ throws IOException {
+ sendSaslMessage(out, DataTransferEncryptorStatus.SUCCESS, payload, null);
+ }
+
+ /**
+ * Sends a SASL negotiation message.
+ *
+ * @param out stream to receive message
+ * @param status negotiation status
+ * @param payload to send
+ * @param message to send
+ * @throws IOException for any error
+ */
+ public static void sendSaslMessage(OutputStream out,
+ DataTransferEncryptorStatus status, byte[] payload, String message)
+ throws IOException {
+ DataTransferEncryptorMessageProto.Builder builder =
+ DataTransferEncryptorMessageProto.newBuilder();
+
+ builder.setStatus(status);
+ if (payload != null) {
+ builder.setPayload(ByteString.copyFrom(payload));
+ }
+ if (message != null) {
+ builder.setMessage(message);
+ }
+
+ DataTransferEncryptorMessageProto proto = builder.build();
+ proto.writeDelimitedTo(out);
+ out.flush();
+ }
+
+ /**
+ * There is no reason to instantiate this class.
+ */
+ private DataTransferSaslUtil() {
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/InvalidMagicNumberException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/InvalidMagicNumberException.java
new file mode 100644
index 00000000000..eb494a87e17
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/InvalidMagicNumberException.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
+
+import static org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil.SASL_TRANSFER_MAGIC_NUMBER;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Indicates that SASL protocol negotiation expected to read a pre-defined magic
+ * number, but the expected value was not seen.
+ */
+@InterfaceAudience.Private
+public class InvalidMagicNumberException extends IOException {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Creates a new InvalidMagicNumberException.
+ *
+ * @param magicNumber expected value
+ */
+ public InvalidMagicNumberException(int magicNumber) {
+ super(String.format("Received %x instead of %x from client.",
+ magicNumber, SASL_TRANSFER_MAGIC_NUMBER));
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java
new file mode 100644
index 00000000000..643af4a9fb1
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java
@@ -0,0 +1,439 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
+import static org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil.*;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.util.Map;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.RealmChoiceCallback;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.net.EncryptedPeer;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+import org.apache.hadoop.security.SaslPropertiesResolver;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Charsets;
+
+/**
+ * Negotiates SASL for DataTransferProtocol on behalf of a client. There are
+ * two possible supported variants of SASL negotiation: either a general-purpose
+ * negotiation supporting any quality of protection, or a specialized
+ * negotiation that enforces privacy as the quality of protection using a
+ * cryptographically strong encryption key.
+ *
+ * This class is used in both the HDFS client and the DataNode. The DataNode
+ * needs it, because it acts as a client to other DataNodes during write
+ * pipelines and block transfers.
+ */
+@InterfaceAudience.Private
+public class SaslDataTransferClient {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ SaslDataTransferClient.class);
+
+ private final boolean fallbackToSimpleAuthAllowed;
+ private final SaslPropertiesResolver saslPropsResolver;
+ private final TrustedChannelResolver trustedChannelResolver;
+
+ /**
+ * Creates a new SaslDataTransferClient.
+ *
+ * @param saslPropsResolver for determining properties of SASL negotiation
+ * @param trustedChannelResolver for identifying trusted connections that do
+ * not require SASL negotiation
+ */
+ public SaslDataTransferClient(SaslPropertiesResolver saslPropsResolver,
+ TrustedChannelResolver trustedChannelResolver,
+ boolean fallbackToSimpleAuthAllowed) {
+ this.fallbackToSimpleAuthAllowed = fallbackToSimpleAuthAllowed;
+ this.saslPropsResolver = saslPropsResolver;
+ this.trustedChannelResolver = trustedChannelResolver;
+ }
+
+ /**
+ * Sends client SASL negotiation for a newly allocated socket if required.
+ *
+ * @param socket connection socket
+ * @param underlyingOut connection output stream
+ * @param underlyingIn connection input stream
+ * @param encryptionKeyFactory for creation of an encryption key
+ * @param accessToken connection block access token
+ * @param datanodeId ID of destination DataNode
+ * @return new pair of streams, wrapped after SASL negotiation
+ * @throws IOException for any error
+ */
+ public IOStreamPair newSocketSend(Socket socket, OutputStream underlyingOut,
+ InputStream underlyingIn, DataEncryptionKeyFactory encryptionKeyFactory,
+ Token accessToken, DatanodeID datanodeId)
+ throws IOException {
+ // The encryption key factory only returns a key if encryption is enabled.
+ DataEncryptionKey encryptionKey = !trustedChannelResolver.isTrusted() ?
+ encryptionKeyFactory.newDataEncryptionKey() : null;
+ IOStreamPair ios = send(socket.getInetAddress(), underlyingOut,
+ underlyingIn, encryptionKey, accessToken, datanodeId);
+ return ios != null ? ios : new IOStreamPair(underlyingIn, underlyingOut);
+ }
+
+ /**
+ * Sends client SASL negotiation for a peer if required.
+ *
+ * @param peer connection peer
+ * @param encryptionKeyFactory for creation of an encryption key
+ * @param accessToken connection block access token
+ * @param datanodeId ID of destination DataNode
+ * @return new pair of streams, wrapped after SASL negotiation
+ * @throws IOException for any error
+ */
+ public Peer peerSend(Peer peer, DataEncryptionKeyFactory encryptionKeyFactory,
+ Token accessToken, DatanodeID datanodeId)
+ throws IOException {
+ IOStreamPair ios = checkTrustAndSend(getPeerAddress(peer),
+ peer.getOutputStream(), peer.getInputStream(), encryptionKeyFactory,
+ accessToken, datanodeId);
+ // TODO: Consider renaming EncryptedPeer to SaslPeer.
+ return ios != null ? new EncryptedPeer(peer, ios) : peer;
+ }
+
+ /**
+ * Sends client SASL negotiation for a socket if required.
+ *
+ * @param socket connection socket
+ * @param underlyingOut connection output stream
+ * @param underlyingIn connection input stream
+ * @param encryptionKeyFactory for creation of an encryption key
+ * @param accessToken connection block access token
+ * @param datanodeId ID of destination DataNode
+ * @return new pair of streams, wrapped after SASL negotiation
+ * @throws IOException for any error
+ */
+ public IOStreamPair socketSend(Socket socket, OutputStream underlyingOut,
+ InputStream underlyingIn, DataEncryptionKeyFactory encryptionKeyFactory,
+ Token accessToken, DatanodeID datanodeId)
+ throws IOException {
+ IOStreamPair ios = checkTrustAndSend(socket.getInetAddress(), underlyingOut,
+ underlyingIn, encryptionKeyFactory, accessToken, datanodeId);
+ return ios != null ? ios : new IOStreamPair(underlyingIn, underlyingOut);
+ }
+
+ /**
+ * Checks if an address is already trusted and then sends client SASL
+ * negotiation if required.
+ *
+ * @param addr connection address
+ * @param underlyingOut connection output stream
+ * @param underlyingIn connection input stream
+ * @param encryptionKeyFactory for creation of an encryption key
+ * @param accessToken connection block access token
+ * @param datanodeId ID of destination DataNode
+ * @return new pair of streams, wrapped after SASL negotiation
+ * @throws IOException for any error
+ */
+ private IOStreamPair checkTrustAndSend(InetAddress addr,
+ OutputStream underlyingOut, InputStream underlyingIn,
+ DataEncryptionKeyFactory encryptionKeyFactory,
+ Token accessToken, DatanodeID datanodeId)
+ throws IOException {
+ if (!trustedChannelResolver.isTrusted() &&
+ !trustedChannelResolver.isTrusted(addr)) {
+ // The encryption key factory only returns a key if encryption is enabled.
+ DataEncryptionKey encryptionKey =
+ encryptionKeyFactory.newDataEncryptionKey();
+ return send(addr, underlyingOut, underlyingIn, encryptionKey, accessToken,
+ datanodeId);
+ } else {
+ LOG.debug(
+ "SASL client skipping handshake on trusted connection for addr = {}, "
+ + "datanodeId = {}", addr, datanodeId);
+ return null;
+ }
+ }
+
+ /**
+ * Sends client SASL negotiation if required. Determines the correct type of
+ * SASL handshake based on configuration.
+ *
+ * @param addr connection address
+ * @param underlyingOut connection output stream
+ * @param underlyingIn connection input stream
+ * @param encryptionKey for an encrypted SASL handshake
+ * @param accessToken connection block access token
+ * @param datanodeId ID of destination DataNode
+ * @return new pair of streams, wrapped after SASL negotiation
+ * @throws IOException for any error
+ */
+ private IOStreamPair send(InetAddress addr, OutputStream underlyingOut,
+ InputStream underlyingIn, DataEncryptionKey encryptionKey,
+ Token accessToken, DatanodeID datanodeId)
+ throws IOException {
+ if (encryptionKey != null) {
+ LOG.debug(
+ "SASL client doing encrypted handshake for addr = {}, datanodeId = {}",
+ addr, datanodeId);
+ return getEncryptedStreams(underlyingOut, underlyingIn,
+ encryptionKey);
+ } else if (!UserGroupInformation.isSecurityEnabled()) {
+ LOG.debug(
+ "SASL client skipping handshake in unsecured configuration for "
+ + "addr = {}, datanodeId = {}", addr, datanodeId);
+ return null;
+ } else if (datanodeId.getXferPort() < 1024) {
+ LOG.debug(
+ "SASL client skipping handshake in secured configuration with "
+ + "privileged port for addr = {}, datanodeId = {}", addr, datanodeId);
+ return null;
+ } else if (accessToken.getIdentifier().length == 0) {
+ if (!fallbackToSimpleAuthAllowed) {
+ throw new IOException(
+ "No block access token was provided (insecure cluster), but this " +
+ "client is configured to allow only secure connections.");
+ }
+ LOG.debug(
+ "SASL client skipping handshake in secured configuration with "
+ + "unsecured cluster for addr = {}, datanodeId = {}", addr, datanodeId);
+ return null;
+ } else {
+ LOG.debug(
+ "SASL client doing general handshake for addr = {}, datanodeId = {}",
+ addr, datanodeId);
+ return getSaslStreams(addr, underlyingOut, underlyingIn, accessToken,
+ datanodeId);
+ }
+ }
+
+ /**
+ * Sends client SASL negotiation for specialized encrypted handshake.
+ *
+ * @param underlyingOut connection output stream
+ * @param underlyingIn connection input stream
+ * @param encryptionKey for an encrypted SASL handshake
+ * @return new pair of streams, wrapped after SASL negotiation
+ * @throws IOException for any error
+ */
+ private IOStreamPair getEncryptedStreams(OutputStream underlyingOut,
+ InputStream underlyingIn, DataEncryptionKey encryptionKey)
+ throws IOException {
+ Map saslProps = createSaslPropertiesForEncryption(
+ encryptionKey.encryptionAlgorithm);
+
+ LOG.debug("Client using encryption algorithm {}",
+ encryptionKey.encryptionAlgorithm);
+
+ String userName = getUserNameFromEncryptionKey(encryptionKey);
+ char[] password = encryptionKeyToPassword(encryptionKey.encryptionKey);
+ CallbackHandler callbackHandler = new SaslClientCallbackHandler(userName,
+ password);
+ return doSaslHandshake(underlyingOut, underlyingIn, userName, saslProps,
+ callbackHandler);
+ }
+
+ /**
+ * The SASL username for an encrypted handshake consists of the keyId,
+ * blockPoolId, and nonce with the first two encoded as Strings, and the third
+ * encoded using Base64. The fields are each separated by a single space.
+ *
+ * @param encryptionKey the encryption key to encode as a SASL username.
+ * @return encoded username containing keyId, blockPoolId, and nonce
+ */
+ private static String getUserNameFromEncryptionKey(
+ DataEncryptionKey encryptionKey) {
+ return encryptionKey.keyId + NAME_DELIMITER +
+ encryptionKey.blockPoolId + NAME_DELIMITER +
+ new String(Base64.encodeBase64(encryptionKey.nonce, false), Charsets.UTF_8);
+ }
+
+ /**
+ * Sets user name and password when asked by the client-side SASL object.
+ */
+ private static final class SaslClientCallbackHandler
+ implements CallbackHandler {
+
+ private final char[] password;
+ private final String userName;
+
+ /**
+ * Creates a new SaslClientCallbackHandler.
+ *
+ * @param userName SASL user name
+ * @Param password SASL password
+ */
+ public SaslClientCallbackHandler(String userName, char[] password) {
+ this.password = password;
+ this.userName = userName;
+ }
+
+ @Override
+ public void handle(Callback[] callbacks) throws IOException,
+ UnsupportedCallbackException {
+ NameCallback nc = null;
+ PasswordCallback pc = null;
+ RealmCallback rc = null;
+ for (Callback callback : callbacks) {
+ if (callback instanceof RealmChoiceCallback) {
+ continue;
+ } else if (callback instanceof NameCallback) {
+ nc = (NameCallback) callback;
+ } else if (callback instanceof PasswordCallback) {
+ pc = (PasswordCallback) callback;
+ } else if (callback instanceof RealmCallback) {
+ rc = (RealmCallback) callback;
+ } else {
+ throw new UnsupportedCallbackException(callback,
+ "Unrecognized SASL client callback");
+ }
+ }
+ if (nc != null) {
+ nc.setName(userName);
+ }
+ if (pc != null) {
+ pc.setPassword(password);
+ }
+ if (rc != null) {
+ rc.setText(rc.getDefaultText());
+ }
+ }
+ }
+
+ /**
+ * Sends client SASL negotiation for general-purpose handshake.
+ *
+ * @param addr connection address
+ * @param underlyingOut connection output stream
+ * @param underlyingIn connection input stream
+ * @param accessToken connection block access token
+ * @param datanodeId ID of destination DataNode
+ * @return new pair of streams, wrapped after SASL negotiation
+ * @throws IOException for any error
+ */
+ private IOStreamPair getSaslStreams(InetAddress addr,
+ OutputStream underlyingOut, InputStream underlyingIn,
+ Token accessToken, DatanodeID datanodeId)
+ throws IOException {
+ if (saslPropsResolver == null) {
+ throw new IOException(String.format("Cannot create a secured " +
+ "connection if DataNode listens on unprivileged port (%d) and no " +
+ "protection is defined in configuration property %s.",
+ datanodeId.getXferPort(), DFS_DATA_TRANSFER_PROTECTION_KEY));
+ }
+ Map saslProps = saslPropsResolver.getClientProperties(addr);
+
+ String userName = buildUserName(accessToken);
+ char[] password = buildClientPassword(accessToken);
+ CallbackHandler callbackHandler = new SaslClientCallbackHandler(userName,
+ password);
+ return doSaslHandshake(underlyingOut, underlyingIn, userName, saslProps,
+ callbackHandler);
+ }
+
+ /**
+ * Builds the client's user name for the general-purpose handshake, consisting
+ * of the base64-encoded serialized block access token identifier. Note that
+ * this includes only the token identifier, not the token itself, which would
+ * include the password. The password is a shared secret, and we must not
+ * write it on the network during the SASL authentication exchange.
+ *
+ * @param blockToken for block access
+ * @return SASL user name
+ */
+ private static String buildUserName(Token blockToken) {
+ return new String(Base64.encodeBase64(blockToken.getIdentifier(), false),
+ Charsets.UTF_8);
+ }
+
+ /**
+ * Calculates the password on the client side for the general-purpose
+ * handshake. The password consists of the block access token's password.
+ *
+ * @param blockToken for block access
+ * @return SASL password
+ */
+ private char[] buildClientPassword(Token blockToken) {
+ return new String(Base64.encodeBase64(blockToken.getPassword(), false),
+ Charsets.UTF_8).toCharArray();
+ }
+
+ /**
+ * This method actually executes the client-side SASL handshake.
+ *
+ * @param underlyingOut connection output stream
+ * @param underlyingIn connection input stream
+ * @param userName SASL user name
+ * @param saslProps properties of SASL negotiation
+ * @param callbackHandler for responding to SASL callbacks
+ * @return new pair of streams, wrapped after SASL negotiation
+ * @throws IOException for any error
+ */
+ private IOStreamPair doSaslHandshake(OutputStream underlyingOut,
+ InputStream underlyingIn, String userName, Map saslProps,
+ CallbackHandler callbackHandler) throws IOException {
+
+ DataOutputStream out = new DataOutputStream(underlyingOut);
+ DataInputStream in = new DataInputStream(underlyingIn);
+
+ SaslParticipant sasl= SaslParticipant.createClientSaslParticipant(userName,
+ saslProps, callbackHandler);
+
+ out.writeInt(SASL_TRANSFER_MAGIC_NUMBER);
+ out.flush();
+
+ try {
+ // Start of handshake - "initial response" in SASL terminology.
+ sendSaslMessage(out, new byte[0]);
+
+ // step 1
+ performSaslStep1(out, in, sasl);
+
+ // step 2 (client-side only)
+ byte[] remoteResponse = readSaslMessage(in);
+ byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
+ assert localResponse == null;
+
+ // SASL handshake is complete
+ checkSaslComplete(sasl, saslProps);
+
+ return sasl.createStreamPair(out, in);
+ } catch (IOException ioe) {
+ sendGenericSaslErrorMessage(out, ioe.getMessage());
+ throw ioe;
+ }
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java
new file mode 100644
index 00000000000..78570579323
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java
@@ -0,0 +1,381 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
+import static org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil.*;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Map;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.SaslException;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
+import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.DNConf;
+import org.apache.hadoop.security.SaslPropertiesResolver;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Charsets;
+
+/**
+ * Negotiates SASL for DataTransferProtocol on behalf of a server. There are
+ * two possible supported variants of SASL negotiation: either a general-purpose
+ * negotiation supporting any quality of protection, or a specialized
+ * negotiation that enforces privacy as the quality of protection using a
+ * cryptographically strong encryption key.
+ *
+ * This class is used in the DataNode for handling inbound connections.
+ */
+@InterfaceAudience.Private
+public class SaslDataTransferServer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ SaslDataTransferServer.class);
+
+ private final BlockPoolTokenSecretManager blockPoolTokenSecretManager;
+ private final DNConf dnConf;
+
+ /**
+ * Creates a new SaslDataTransferServer.
+ *
+ * @param dnConf configuration of DataNode
+ * @param blockPoolTokenSecretManager used for checking block access tokens
+ * and encryption keys
+ */
+ public SaslDataTransferServer(DNConf dnConf,
+ BlockPoolTokenSecretManager blockPoolTokenSecretManager) {
+ this.blockPoolTokenSecretManager = blockPoolTokenSecretManager;
+ this.dnConf = dnConf;
+ }
+
+ /**
+ * Receives SASL negotiation from a peer on behalf of a server.
+ *
+ * @param peer connection peer
+ * @param underlyingOut connection output stream
+ * @param underlyingIn connection input stream
+ * @param datanodeId ID of DataNode accepting connection
+ * @return new pair of streams, wrapped after SASL negotiation
+ * @throws IOException for any error
+ */
+ public IOStreamPair receive(Peer peer, OutputStream underlyingOut,
+ InputStream underlyingIn, DatanodeID datanodeId) throws IOException {
+ if (dnConf.getEncryptDataTransfer()) {
+ LOG.debug(
+ "SASL server doing encrypted handshake for peer = {}, datanodeId = {}",
+ peer, datanodeId);
+ return getEncryptedStreams(peer, underlyingOut, underlyingIn);
+ } else if (!UserGroupInformation.isSecurityEnabled()) {
+ LOG.debug(
+ "SASL server skipping handshake in unsecured configuration for "
+ + "peer = {}, datanodeId = {}", peer, datanodeId);
+ return new IOStreamPair(underlyingIn, underlyingOut);
+ } else if (datanodeId.getXferPort() < 1024) {
+ LOG.debug(
+ "SASL server skipping handshake in unsecured configuration for "
+ + "peer = {}, datanodeId = {}", peer, datanodeId);
+ return new IOStreamPair(underlyingIn, underlyingOut);
+ } else {
+ LOG.debug(
+ "SASL server doing general handshake for peer = {}, datanodeId = {}",
+ peer, datanodeId);
+ return getSaslStreams(peer, underlyingOut, underlyingIn, datanodeId);
+ }
+ }
+
+ /**
+ * Receives SASL negotiation for specialized encrypted handshake.
+ *
+ * @param peer connection peer
+ * @param underlyingOut connection output stream
+ * @param underlyingIn connection input stream
+ * @return new pair of streams, wrapped after SASL negotiation
+ * @throws IOException for any error
+ */
+ private IOStreamPair getEncryptedStreams(Peer peer,
+ OutputStream underlyingOut, InputStream underlyingIn) throws IOException {
+ if (peer.hasSecureChannel() ||
+ dnConf.getTrustedChannelResolver().isTrusted(getPeerAddress(peer))) {
+ return new IOStreamPair(underlyingIn, underlyingOut);
+ }
+
+ Map saslProps = createSaslPropertiesForEncryption(
+ dnConf.getEncryptionAlgorithm());
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Server using encryption algorithm " +
+ dnConf.getEncryptionAlgorithm());
+ }
+
+ CallbackHandler callbackHandler = new SaslServerCallbackHandler(
+ new PasswordFunction() {
+ @Override
+ public char[] apply(String userName) throws IOException {
+ return encryptionKeyToPassword(getEncryptionKeyFromUserName(userName));
+ }
+ });
+ return doSaslHandshake(underlyingOut, underlyingIn, saslProps,
+ callbackHandler);
+ }
+
+ /**
+ * The SASL handshake for encrypted vs. general-purpose uses different logic
+ * for determining the password. This interface is used to parameterize that
+ * logic. It's similar to a Guava Function, but we need to let it throw
+ * exceptions.
+ */
+ private interface PasswordFunction {
+
+ /**
+ * Returns the SASL password for the given user name.
+ *
+ * @param userName SASL user name
+ * @return SASL password
+ * @throws IOException for any error
+ */
+ char[] apply(String userName) throws IOException;
+ }
+
+ /**
+ * Sets user name and password when asked by the server-side SASL object.
+ */
+ private static final class SaslServerCallbackHandler
+ implements CallbackHandler {
+
+ private final PasswordFunction passwordFunction;
+
+ /**
+ * Creates a new SaslServerCallbackHandler.
+ *
+ * @param passwordFunction for determing the user's password
+ */
+ public SaslServerCallbackHandler(PasswordFunction passwordFunction) {
+ this.passwordFunction = passwordFunction;
+ }
+
+ @Override
+ public void handle(Callback[] callbacks) throws IOException,
+ UnsupportedCallbackException {
+ NameCallback nc = null;
+ PasswordCallback pc = null;
+ AuthorizeCallback ac = null;
+ for (Callback callback : callbacks) {
+ if (callback instanceof AuthorizeCallback) {
+ ac = (AuthorizeCallback) callback;
+ } else if (callback instanceof PasswordCallback) {
+ pc = (PasswordCallback) callback;
+ } else if (callback instanceof NameCallback) {
+ nc = (NameCallback) callback;
+ } else if (callback instanceof RealmCallback) {
+ continue; // realm is ignored
+ } else {
+ throw new UnsupportedCallbackException(callback,
+ "Unrecognized SASL DIGEST-MD5 Callback: " + callback);
+ }
+ }
+
+ if (pc != null) {
+ pc.setPassword(passwordFunction.apply(nc.getDefaultName()));
+ }
+
+ if (ac != null) {
+ ac.setAuthorized(true);
+ ac.setAuthorizedID(ac.getAuthorizationID());
+ }
+ }
+ }
+
+ /**
+ * Given a secret manager and a username encoded for the encrypted handshake,
+ * determine the encryption key.
+ *
+ * @param userName containing the keyId, blockPoolId, and nonce.
+ * @return secret encryption key.
+ * @throws IOException
+ */
+ private byte[] getEncryptionKeyFromUserName(String userName)
+ throws IOException {
+ String[] nameComponents = userName.split(NAME_DELIMITER);
+ if (nameComponents.length != 3) {
+ throw new IOException("Provided name '" + userName + "' has " +
+ nameComponents.length + " components instead of the expected 3.");
+ }
+ int keyId = Integer.parseInt(nameComponents[0]);
+ String blockPoolId = nameComponents[1];
+ byte[] nonce = Base64.decodeBase64(nameComponents[2]);
+ return blockPoolTokenSecretManager.retrieveDataEncryptionKey(keyId,
+ blockPoolId, nonce);
+ }
+
+ /**
+ * Receives SASL negotiation for general-purpose handshake.
+ *
+ * @param peer connection peer
+ * @param underlyingOut connection output stream
+ * @param underlyingIn connection input stream
+ * @param datanodeId ID of DataNode accepting connection
+ * @return new pair of streams, wrapped after SASL negotiation
+ * @throws IOException for any error
+ */
+ private IOStreamPair getSaslStreams(Peer peer, OutputStream underlyingOut,
+ InputStream underlyingIn, final DatanodeID datanodeId) throws IOException {
+ SaslPropertiesResolver saslPropsResolver = dnConf.getSaslPropsResolver();
+ if (saslPropsResolver == null) {
+ throw new IOException(String.format("Cannot create a secured " +
+ "connection if DataNode listens on unprivileged port (%d) and no " +
+ "protection is defined in configuration property %s.",
+ datanodeId.getXferPort(), DFS_DATA_TRANSFER_PROTECTION_KEY));
+ }
+ Map saslProps = saslPropsResolver.getServerProperties(
+ getPeerAddress(peer));
+
+ CallbackHandler callbackHandler = new SaslServerCallbackHandler(
+ new PasswordFunction() {
+ @Override
+ public char[] apply(String userName) throws IOException {
+ return buildServerPassword(userName);
+ }
+ });
+ return doSaslHandshake(underlyingOut, underlyingIn, saslProps,
+ callbackHandler);
+ }
+
+ /**
+ * Calculates the expected correct password on the server side for the
+ * general-purpose handshake. The password consists of the block access
+ * token's password (known to the DataNode via its secret manager). This
+ * expects that the client has supplied a user name consisting of its
+ * serialized block access token identifier.
+ *
+ * @param userName SASL user name containing serialized block access token
+ * identifier
+ * @return expected correct SASL password
+ * @throws IOException for any error
+ */
+ private char[] buildServerPassword(String userName) throws IOException {
+ BlockTokenIdentifier identifier = deserializeIdentifier(userName);
+ byte[] tokenPassword = blockPoolTokenSecretManager.retrievePassword(
+ identifier);
+ return (new String(Base64.encodeBase64(tokenPassword, false),
+ Charsets.UTF_8)).toCharArray();
+ }
+
+ /**
+ * Deserializes a base64-encoded binary representation of a block access
+ * token.
+ *
+ * @param str String to deserialize
+ * @return BlockTokenIdentifier deserialized from str
+ * @throws IOException if there is any I/O error
+ */
+ private BlockTokenIdentifier deserializeIdentifier(String str)
+ throws IOException {
+ BlockTokenIdentifier identifier = new BlockTokenIdentifier();
+ identifier.readFields(new DataInputStream(new ByteArrayInputStream(
+ Base64.decodeBase64(str))));
+ return identifier;
+ }
+
+ /**
+ * This method actually executes the server-side SASL handshake.
+ *
+ * @param underlyingOut connection output stream
+ * @param underlyingIn connection input stream
+ * @param saslProps properties of SASL negotiation
+ * @param callbackHandler for responding to SASL callbacks
+ * @return new pair of streams, wrapped after SASL negotiation
+ * @throws IOException for any error
+ */
+ private IOStreamPair doSaslHandshake(OutputStream underlyingOut,
+ InputStream underlyingIn, Map saslProps,
+ CallbackHandler callbackHandler) throws IOException {
+
+ DataInputStream in = new DataInputStream(underlyingIn);
+ DataOutputStream out = new DataOutputStream(underlyingOut);
+
+ SaslParticipant sasl = SaslParticipant.createServerSaslParticipant(saslProps,
+ callbackHandler);
+
+ int magicNumber = in.readInt();
+ if (magicNumber != SASL_TRANSFER_MAGIC_NUMBER) {
+ throw new InvalidMagicNumberException(magicNumber);
+ }
+ try {
+ // step 1
+ performSaslStep1(out, in, sasl);
+
+ // step 2 (server-side only)
+ byte[] remoteResponse = readSaslMessage(in);
+ byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
+ sendSaslMessage(out, localResponse);
+
+ // SASL handshake is complete
+ checkSaslComplete(sasl, saslProps);
+
+ return sasl.createStreamPair(out, in);
+ } catch (IOException ioe) {
+ if (ioe instanceof SaslException &&
+ ioe.getCause() != null &&
+ ioe.getCause() instanceof InvalidEncryptionKeyException) {
+ // This could just be because the client is long-lived and hasn't gotten
+ // a new encryption key from the NN in a while. Upon receiving this
+ // error, the client will get a new encryption key from the NN and retry
+ // connecting to this DN.
+ sendInvalidKeySaslErrorMessage(out, ioe.getCause().getMessage());
+ } else {
+ sendGenericSaslErrorMessage(out, ioe.getMessage());
+ }
+ throw ioe;
+ }
+ }
+
+ /**
+ * Sends a SASL negotiation message indicating an invalid key error.
+ *
+ * @param out stream to receive message
+ * @param message to send
+ * @throws IOException for any error
+ */
+ private static void sendInvalidKeySaslErrorMessage(DataOutputStream out,
+ String message) throws IOException {
+ sendSaslMessage(out, DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY, null,
+ message);
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslParticipant.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslParticipant.java
new file mode 100644
index 00000000000..106e297d5c8
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslParticipant.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.util.Map;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.apache.hadoop.security.SaslInputStream;
+import org.apache.hadoop.security.SaslOutputStream;
+
+/**
+ * Strongly inspired by Thrift's TSaslTransport class.
+ *
+ * Used to abstract over the SaslServer
and
+ * SaslClient
classes, which share a lot of their interface, but
+ * unfortunately don't share a common superclass.
+ */
+@InterfaceAudience.Private
+class SaslParticipant {
+
+ // This has to be set as part of the SASL spec, but it don't matter for
+ // our purposes, but may not be empty. It's sent over the wire, so use
+ // a short string.
+ private static final String SERVER_NAME = "0";
+ private static final String PROTOCOL = "hdfs";
+ private static final String MECHANISM = "DIGEST-MD5";
+
+ // One of these will always be null.
+ private final SaslServer saslServer;
+ private final SaslClient saslClient;
+
+ /**
+ * Creates a SaslParticipant wrapping a SaslServer.
+ *
+ * @param saslProps properties of SASL negotiation
+ * @param callbackHandler for handling all SASL callbacks
+ * @return SaslParticipant wrapping SaslServer
+ * @throws SaslException for any error
+ */
+ public static SaslParticipant createServerSaslParticipant(
+ Map saslProps, CallbackHandler callbackHandler)
+ throws SaslException {
+ return new SaslParticipant(Sasl.createSaslServer(MECHANISM,
+ PROTOCOL, SERVER_NAME, saslProps, callbackHandler));
+ }
+
+ /**
+ * Creates a SaslParticipant wrapping a SaslClient.
+ *
+ * @param userName SASL user name
+ * @param saslProps properties of SASL negotiation
+ * @param callbackHandler for handling all SASL callbacks
+ * @return SaslParticipant wrapping SaslClient
+ * @throws SaslException for any error
+ */
+ public static SaslParticipant createClientSaslParticipant(String userName,
+ Map saslProps, CallbackHandler callbackHandler)
+ throws SaslException {
+ return new SaslParticipant(Sasl.createSaslClient(new String[] { MECHANISM },
+ userName, PROTOCOL, SERVER_NAME, saslProps, callbackHandler));
+ }
+
+ /**
+ * Private constructor wrapping a SaslServer.
+ *
+ * @param saslServer to wrap
+ */
+ private SaslParticipant(SaslServer saslServer) {
+ this.saslServer = saslServer;
+ this.saslClient = null;
+ }
+
+ /**
+ * Private constructor wrapping a SaslClient.
+ *
+ * @param saslClient to wrap
+ */
+ private SaslParticipant(SaslClient saslClient) {
+ this.saslServer = null;
+ this.saslClient = saslClient;
+ }
+
+ /**
+ * @see {@link SaslServer#evaluateResponse}
+ * @see {@link SaslClient#evaluateChallenge}
+ */
+ public byte[] evaluateChallengeOrResponse(byte[] challengeOrResponse)
+ throws SaslException {
+ if (saslClient != null) {
+ return saslClient.evaluateChallenge(challengeOrResponse);
+ } else {
+ return saslServer.evaluateResponse(challengeOrResponse);
+ }
+ }
+
+ /**
+ * After successful SASL negotation, returns the negotiated quality of
+ * protection.
+ *
+ * @return negotiated quality of protection
+ */
+ public String getNegotiatedQop() {
+ if (saslClient != null) {
+ return (String) saslClient.getNegotiatedProperty(Sasl.QOP);
+ } else {
+ return (String) saslServer.getNegotiatedProperty(Sasl.QOP);
+ }
+ }
+
+ /**
+ * Returns true if SASL negotiation is complete.
+ *
+ * @return true if SASL negotiation is complete
+ */
+ public boolean isComplete() {
+ if (saslClient != null) {
+ return saslClient.isComplete();
+ } else {
+ return saslServer.isComplete();
+ }
+ }
+
+ /**
+ * Return some input/output streams that may henceforth have their
+ * communication encrypted, depending on the negotiated quality of protection.
+ *
+ * @param out output stream to wrap
+ * @param in input stream to wrap
+ * @return IOStreamPair wrapping the streams
+ */
+ public IOStreamPair createStreamPair(DataOutputStream out,
+ DataInputStream in) {
+ if (saslClient != null) {
+ return new IOStreamPair(
+ new SaslInputStream(in, saslClient),
+ new SaslOutputStream(out, saslClient));
+ } else {
+ return new IOStreamPair(
+ new SaslInputStream(in, saslServer),
+ new SaslOutputStream(out, saslServer));
+ }
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
index 1a590a912c5..93e240c3e63 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.hdfs.server.balancer;
import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY;
import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
import java.io.BufferedInputStream;
@@ -62,9 +64,11 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
@@ -202,6 +206,7 @@ public class Balancer {
private final NameNodeConnector nnc;
private final BalancingPolicy policy;
+ private final SaslDataTransferClient saslClient;
private final double threshold;
// all data node lists
@@ -352,19 +357,18 @@ public class Balancer {
OutputStream unbufOut = sock.getOutputStream();
InputStream unbufIn = sock.getInputStream();
- if (nnc.getDataEncryptionKey() != null) {
- IOStreamPair encryptedStreams =
- DataTransferEncryptor.getEncryptedStreams(
- unbufOut, unbufIn, nnc.getDataEncryptionKey());
- unbufOut = encryptedStreams.out;
- unbufIn = encryptedStreams.in;
- }
+ ExtendedBlock eb = new ExtendedBlock(nnc.blockpoolID, block.getBlock());
+ Token accessToken = nnc.getAccessToken(eb);
+ IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
+ unbufIn, nnc, accessToken, target.datanode);
+ unbufOut = saslStreams.out;
+ unbufIn = saslStreams.in;
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
HdfsConstants.IO_FILE_BUFFER_SIZE));
in = new DataInputStream(new BufferedInputStream(unbufIn,
HdfsConstants.IO_FILE_BUFFER_SIZE));
- sendRequest(out);
+ sendRequest(out, eb, accessToken);
receiveResponse(in);
bytesMoved.addAndGet(block.getNumBytes());
LOG.info("Successfully moved " + this);
@@ -395,9 +399,8 @@ public class Balancer {
}
/* Send a block replace request to the output stream*/
- private void sendRequest(DataOutputStream out) throws IOException {
- final ExtendedBlock eb = new ExtendedBlock(nnc.blockpoolID, block.getBlock());
- final Token accessToken = nnc.getAccessToken(eb);
+ private void sendRequest(DataOutputStream out, ExtendedBlock eb,
+ Token accessToken) throws IOException {
new Sender(out).replaceBlock(eb, accessToken,
source.getStorageID(), proxySource.getDatanode());
}
@@ -876,6 +879,12 @@ public class Balancer {
this.maxConcurrentMovesPerNode =
conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
+ this.saslClient = new SaslDataTransferClient(
+ DataTransferSaslUtil.getSaslPropertiesResolver(conf),
+ TrustedChannelResolver.getInstance(conf),
+ conf.getBoolean(
+ IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
+ IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT));
}
/* Given a data node set, build a network topology and decide
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
index 25784a26cfa..fb322763d76 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
@@ -34,8 +34,8 @@ import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
-import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
@@ -50,7 +50,7 @@ import org.apache.hadoop.util.Daemon;
* The class provides utilities for {@link Balancer} to access a NameNode
*/
@InterfaceAudience.Private
-class NameNodeConnector {
+class NameNodeConnector implements DataEncryptionKeyFactory {
private static final Log LOG = Balancer.LOG;
private static final Path BALANCER_ID_PATH = new Path("/system/balancer.id");
private static final int MAX_NOT_CHANGED_ITERATIONS = 5;
@@ -72,7 +72,6 @@ class NameNodeConnector {
private BlockTokenSecretManager blockTokenSecretManager;
private Daemon keyupdaterthread; // AccessKeyUpdater thread
private DataEncryptionKey encryptionKey;
- private final TrustedChannelResolver trustedChannelResolver;
NameNodeConnector(URI nameNodeUri,
Configuration conf) throws IOException {
@@ -122,7 +121,6 @@ class NameNodeConnector {
if (out == null) {
throw new IOException("Another balancer is running");
}
- this.trustedChannelResolver = TrustedChannelResolver.getInstance(conf);
}
boolean shouldContinue(long dispatchBlockMoveBytes) {
@@ -154,10 +152,10 @@ class NameNodeConnector {
BlockTokenSecretManager.AccessMode.COPY));
}
}
-
- DataEncryptionKey getDataEncryptionKey()
- throws IOException {
- if (encryptDataTransfer && !this.trustedChannelResolver.isTrusted()) {
+
+ @Override
+ public DataEncryptionKey newDataEncryptionKey() {
+ if (encryptDataTransfer) {
synchronized (this) {
if (encryptionKey == null) {
encryptionKey = blockTokenSecretManager.generateDataEncryptionKey();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
index 6c0d23f3e49..acdda66b535 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
@@ -58,8 +58,8 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
@@ -211,14 +211,16 @@ public class JspHelper {
}
public static void streamBlockInAscii(InetSocketAddress addr, String poolId,
- long blockId, Token blockToken, long genStamp,
+ long blockId, final Token blockToken, long genStamp,
long blockSize, long offsetIntoBlock, long chunkSizeToView,
JspWriter out, final Configuration conf, DFSClient.Conf dfsConf,
- final DataEncryptionKey encryptionKey)
+ final DFSClient dfs, final SaslDataTransferClient saslClient)
throws IOException {
if (chunkSizeToView == 0) return;
int amtToRead = (int)Math.min(chunkSizeToView, blockSize - offsetIntoBlock);
+ DatanodeID datanodeId = new DatanodeID(addr.getAddress().getHostAddress(),
+ addr.getHostName(), poolId, addr.getPort(), 0, 0, 0);
BlockReader blockReader = new BlockReaderFactory(dfsConf).
setInetSocketAddress(addr).
setBlock(new ExtendedBlock(poolId, blockId, 0, genStamp)).
@@ -229,21 +231,21 @@ public class JspHelper {
setVerifyChecksum(true).
setClientName("JspHelper").
setClientCacheContext(ClientContext.getFromConf(conf)).
- setDatanodeInfo(new DatanodeInfo(
- new DatanodeID(addr.getAddress().getHostAddress(),
- addr.getHostName(), poolId, addr.getPort(), 0, 0, 0))).
+ setDatanodeInfo(new DatanodeInfo(datanodeId)).
setCachingStrategy(CachingStrategy.newDefaultStrategy()).
setConfiguration(conf).
setRemotePeerFactory(new RemotePeerFactory() {
@Override
- public Peer newConnectedPeer(InetSocketAddress addr)
+ public Peer newConnectedPeer(InetSocketAddress addr,
+ Token blockToken, DatanodeID datanodeId)
throws IOException {
Peer peer = null;
Socket sock = NetUtils.getDefaultSocketFactory(conf).createSocket();
try {
sock.connect(addr, HdfsServerConstants.READ_TIMEOUT);
sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
- peer = TcpPeerServer.peerFromSocketAndKey(sock, encryptionKey);
+ peer = TcpPeerServer.peerFromSocketAndKey(saslClient, sock, dfs,
+ blockToken, datanodeId);
} finally {
if (peer == null) {
IOUtils.closeSocket(sock);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
index 4c1d39da7ba..4a36472cb00 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
@@ -52,7 +52,9 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_RESTART_REPLICA_
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.security.SaslPropertiesResolver;
/**
* Simple class encapsulating all of the configuration that the DataNode
@@ -86,6 +88,7 @@ public class DNConf {
final String minimumNameNodeVersion;
final String encryptionAlgorithm;
+ final SaslPropertiesResolver saslPropsResolver;
final TrustedChannelResolver trustedChannelResolver;
final long xceiverStopTimeout;
@@ -168,6 +171,8 @@ public class DNConf {
DFS_ENCRYPT_DATA_TRANSFER_DEFAULT);
this.encryptionAlgorithm = conf.get(DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
this.trustedChannelResolver = TrustedChannelResolver.getInstance(conf);
+ this.saslPropsResolver = DataTransferSaslUtil.getSaslPropertiesResolver(
+ conf);
this.xceiverStopTimeout = conf.getLong(
DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY,
@@ -186,7 +191,26 @@ public class DNConf {
String getMinimumNameNodeVersion() {
return this.minimumNameNodeVersion;
}
-
+
+ /**
+ * Returns true if encryption enabled for DataTransferProtocol.
+ *
+ * @return boolean true if encryption enabled for DataTransferProtocol
+ */
+ public boolean getEncryptDataTransfer() {
+ return encryptDataTransfer;
+ }
+
+ /**
+ * Returns encryption algorithm configured for DataTransferProtocol, or null
+ * if not configured.
+ *
+ * @return encryption algorithm configured for DataTransferProtocol
+ */
+ public String getEncryptionAlgorithm() {
+ return encryptionAlgorithm;
+ }
+
public long getXceiverStopTimeout() {
return xceiverStopTimeout;
}
@@ -194,4 +218,24 @@ public class DNConf {
public long getMaxLockedMemory() {
return maxLockedMemory;
}
+
+ /**
+ * Returns the SaslPropertiesResolver configured for use with
+ * DataTransferProtocol, or null if not configured.
+ *
+ * @return SaslPropertiesResolver configured for use with DataTransferProtocol
+ */
+ public SaslPropertiesResolver getSaslPropsResolver() {
+ return saslPropsResolver;
+ }
+
+ /**
+ * Returns the TrustedChannelResolver configured for use with
+ * DataTransferProtocol, or null if not configured.
+ *
+ * @return TrustedChannelResolver configured for use with DataTransferProtocol
+ */
+ public TrustedChannelResolver getTrustedChannelResolver() {
+ return trustedChannelResolver;
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index db3ff9c0d7d..8850f116a38 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY;
+
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
@@ -41,6 +44,9 @@ import org.apache.hadoop.hdfs.net.DomainPeerServer;
import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.protocol.datatransfer.*;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferServer;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ClientDatanodeProtocolService;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DNTransferAckProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
@@ -227,6 +233,8 @@ public class DataNode extends Configured
private final List usersWithLocalPathAccess;
private final boolean connectToDnViaHostname;
ReadaheadPool readaheadPool;
+ SaslDataTransferClient saslClient;
+ SaslDataTransferServer saslServer;
private final boolean getHdfsBlockLocationsEnabled;
private ObjectName dataNodeInfoBeanName;
private Thread checkDiskErrorThread = null;
@@ -729,15 +737,10 @@ public class DataNode extends Configured
*/
void startDataNode(Configuration conf,
List dataDirs,
- // DatanodeProtocol namenode,
SecureResources resources
) throws IOException {
- if(UserGroupInformation.isSecurityEnabled() && resources == null) {
- if (!conf.getBoolean("ignore.secure.ports.for.testing", false)) {
- throw new RuntimeException("Cannot start secure cluster without "
- + "privileged resources.");
- }
- }
+
+ checkSecureConfig(conf, resources);
// settings global for all BPs in the Data Node
this.secureResources = resources;
@@ -797,6 +800,55 @@ public class DataNode extends Configured
// Create the ReadaheadPool from the DataNode context so we can
// exit without having to explicitly shutdown its thread pool.
readaheadPool = ReadaheadPool.getInstance();
+ saslClient = new SaslDataTransferClient(dnConf.saslPropsResolver,
+ dnConf.trustedChannelResolver,
+ conf.getBoolean(
+ IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
+ IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT));
+ saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager);
+ }
+
+ /**
+ * Checks if the DataNode has a secure configuration if security is enabled.
+ * There are 2 possible configurations that are considered secure:
+ * 1. The server has bound to privileged ports for RPC and HTTP via
+ * SecureDataNodeStarter.
+ * 2. The configuration enables SASL on DataTransferProtocol and HTTPS (no
+ * plain HTTP) for the HTTP server. The SASL handshake guarantees
+ * authentication of the RPC server before a client transmits a secret, such
+ * as a block access token. Similarly, SSL guarantees authentication of the
+ * HTTP server before a client transmits a secret, such as a delegation
+ * token.
+ * It is not possible to run with both privileged ports and SASL on
+ * DataTransferProtocol. For backwards-compatibility, the connection logic
+ * must check if the target port is a privileged port, and if so, skip the
+ * SASL handshake.
+ *
+ * @param conf Configuration to check
+ * @param resources SecuredResources obtained for DataNode
+ * @throws RuntimeException if security enabled, but configuration is insecure
+ */
+ private static void checkSecureConfig(Configuration conf,
+ SecureResources resources) throws RuntimeException {
+ if (!UserGroupInformation.isSecurityEnabled()) {
+ return;
+ }
+ String dataTransferProtection = conf.get(DFS_DATA_TRANSFER_PROTECTION_KEY);
+ if (resources != null && dataTransferProtection == null) {
+ return;
+ }
+ if (conf.getBoolean("ignore.secure.ports.for.testing", false)) {
+ return;
+ }
+ if (dataTransferProtection != null &&
+ DFSUtil.getHttpPolicy(conf) == HttpConfig.Policy.HTTPS_ONLY &&
+ resources == null) {
+ return;
+ }
+ throw new RuntimeException("Cannot start secure DataNode without " +
+ "configuring either privileged resources or SASL RPC data transfer " +
+ "protection and SSL for HTTP. Using privileged resources in " +
+ "combination with SASL RPC data transfer protection is not supported.");
}
public static String generateUuid() {
@@ -1630,28 +1682,6 @@ public class DataNode extends Configured
NetUtils.connect(sock, curTarget, dnConf.socketTimeout);
sock.setSoTimeout(targets.length * dnConf.socketTimeout);
- long writeTimeout = dnConf.socketWriteTimeout +
- HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1);
- OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
- InputStream unbufIn = NetUtils.getInputStream(sock);
- if (dnConf.encryptDataTransfer &&
- !dnConf.trustedChannelResolver.isTrusted(sock.getInetAddress())) {
- IOStreamPair encryptedStreams =
- DataTransferEncryptor.getEncryptedStreams(
- unbufOut, unbufIn,
- blockPoolTokenSecretManager.generateDataEncryptionKey(
- b.getBlockPoolId()));
- unbufOut = encryptedStreams.out;
- unbufIn = encryptedStreams.in;
- }
-
- out = new DataOutputStream(new BufferedOutputStream(unbufOut,
- HdfsConstants.SMALL_BUFFER_SIZE));
- in = new DataInputStream(unbufIn);
- blockSender = new BlockSender(b, 0, b.getNumBytes(),
- false, false, true, DataNode.this, null, cachingStrategy);
- DatanodeInfo srcNode = new DatanodeInfo(bpReg);
-
//
// Header info
//
@@ -1661,6 +1691,24 @@ public class DataNode extends Configured
EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE));
}
+ long writeTimeout = dnConf.socketWriteTimeout +
+ HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1);
+ OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
+ InputStream unbufIn = NetUtils.getInputStream(sock);
+ DataEncryptionKeyFactory keyFactory =
+ getDataEncryptionKeyFactoryForBlock(b);
+ IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
+ unbufIn, keyFactory, accessToken, bpReg);
+ unbufOut = saslStreams.out;
+ unbufIn = saslStreams.in;
+
+ out = new DataOutputStream(new BufferedOutputStream(unbufOut,
+ HdfsConstants.SMALL_BUFFER_SIZE));
+ in = new DataInputStream(unbufIn);
+ blockSender = new BlockSender(b, 0, b.getNumBytes(),
+ false, false, true, DataNode.this, null, cachingStrategy);
+ DatanodeInfo srcNode = new DatanodeInfo(bpReg);
+
new Sender(out).writeBlock(b, accessToken, clientname, targets, srcNode,
stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy);
@@ -1703,7 +1751,26 @@ public class DataNode extends Configured
}
}
}
-
+
+ /**
+ * Returns a new DataEncryptionKeyFactory that generates a key from the
+ * BlockPoolTokenSecretManager, using the block pool ID of the given block.
+ *
+ * @param block for which the factory needs to create a key
+ * @return DataEncryptionKeyFactory for block's block pool ID
+ */
+ DataEncryptionKeyFactory getDataEncryptionKeyFactoryForBlock(
+ final ExtendedBlock block) {
+ return new DataEncryptionKeyFactory() {
+ @Override
+ public DataEncryptionKey newDataEncryptionKey() {
+ return dnConf.encryptDataTransfer ?
+ blockPoolTokenSecretManager.generateDataEncryptionKey(
+ block.getBlockPoolId()) : null;
+ }
+ };
+ }
+
/**
* After a block becomes finalized, a datanode increases metric counter,
* notifies namenode, and adds it to the block scanner
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index a118fcb1ff3..7b730905156 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -36,11 +36,9 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
-import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
-import java.net.UnknownHostException;
import java.nio.channels.ClosedChannelException;
import java.security.MessageDigest;
import java.util.Arrays;
@@ -52,13 +50,12 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor.InvalidMagicNumberException;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
import org.apache.hadoop.hdfs.protocol.datatransfer.Receiver;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
@@ -85,7 +82,6 @@ import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
import com.google.common.base.Preconditions;
-import com.google.common.net.InetAddresses;
import com.google.protobuf.ByteString;
@@ -174,24 +170,11 @@ class DataXceiver extends Receiver implements Runnable {
dataXceiverServer.addPeer(peer, Thread.currentThread());
peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout);
InputStream input = socketIn;
- if ((!peer.hasSecureChannel()) && dnConf.encryptDataTransfer &&
- !dnConf.trustedChannelResolver.isTrusted(getClientAddress(peer))){
- IOStreamPair encryptedStreams = null;
- try {
- encryptedStreams = DataTransferEncryptor.getEncryptedStreams(socketOut,
- socketIn, datanode.blockPoolTokenSecretManager,
- dnConf.encryptionAlgorithm);
- } catch (InvalidMagicNumberException imne) {
- LOG.info("Failed to read expected encryption handshake from client " +
- "at " + peer.getRemoteAddressString() + ". Perhaps the client " +
- "is running an older version of Hadoop which does not support " +
- "encryption");
- return;
- }
- input = encryptedStreams.in;
- socketOut = encryptedStreams.out;
- }
- input = new BufferedInputStream(input, HdfsConstants.SMALL_BUFFER_SIZE);
+ IOStreamPair saslStreams = datanode.saslServer.receive(peer, socketOut,
+ socketIn, datanode.getDatanodeId());
+ input = new BufferedInputStream(saslStreams.in,
+ HdfsConstants.SMALL_BUFFER_SIZE);
+ socketOut = saslStreams.out;
super.initialize(new DataInputStream(input));
@@ -263,19 +246,6 @@ class DataXceiver extends Receiver implements Runnable {
}
}
}
-
- /**
- * Returns InetAddress from peer
- * The getRemoteAddressString is the form /ip-address:port
- * The ip-address is extracted from peer and InetAddress is formed
- * @param peer
- * @return
- * @throws UnknownHostException
- */
- private static InetAddress getClientAddress(Peer peer) {
- return InetAddresses.forString(
- peer.getRemoteAddressString().split(":")[0].substring(1));
- }
@Override
public void requestShortCircuitFds(final ExtendedBlock blk,
@@ -656,17 +626,12 @@ class DataXceiver extends Receiver implements Runnable {
OutputStream unbufMirrorOut = NetUtils.getOutputStream(mirrorSock,
writeTimeout);
InputStream unbufMirrorIn = NetUtils.getInputStream(mirrorSock);
- if (dnConf.encryptDataTransfer &&
- !dnConf.trustedChannelResolver.isTrusted(mirrorSock.getInetAddress())) {
- IOStreamPair encryptedStreams =
- DataTransferEncryptor.getEncryptedStreams(
- unbufMirrorOut, unbufMirrorIn,
- datanode.blockPoolTokenSecretManager
- .generateDataEncryptionKey(block.getBlockPoolId()));
-
- unbufMirrorOut = encryptedStreams.out;
- unbufMirrorIn = encryptedStreams.in;
- }
+ DataEncryptionKeyFactory keyFactory =
+ datanode.getDataEncryptionKeyFactoryForBlock(block);
+ IOStreamPair saslStreams = datanode.saslClient.socketSend(mirrorSock,
+ unbufMirrorOut, unbufMirrorIn, keyFactory, blockToken, targets[0]);
+ unbufMirrorOut = saslStreams.out;
+ unbufMirrorIn = saslStreams.in;
mirrorOut = new DataOutputStream(new BufferedOutputStream(unbufMirrorOut,
HdfsConstants.SMALL_BUFFER_SIZE));
mirrorIn = new DataInputStream(unbufMirrorIn);
@@ -1026,17 +991,12 @@ class DataXceiver extends Receiver implements Runnable {
OutputStream unbufProxyOut = NetUtils.getOutputStream(proxySock,
dnConf.socketWriteTimeout);
InputStream unbufProxyIn = NetUtils.getInputStream(proxySock);
- if (dnConf.encryptDataTransfer &&
- !dnConf.trustedChannelResolver.isTrusted(
- proxySock.getInetAddress())) {
- IOStreamPair encryptedStreams =
- DataTransferEncryptor.getEncryptedStreams(
- unbufProxyOut, unbufProxyIn,
- datanode.blockPoolTokenSecretManager
- .generateDataEncryptionKey(block.getBlockPoolId()));
- unbufProxyOut = encryptedStreams.out;
- unbufProxyIn = encryptedStreams.in;
- }
+ DataEncryptionKeyFactory keyFactory =
+ datanode.getDataEncryptionKeyFactoryForBlock(block);
+ IOStreamPair saslStreams = datanode.saslClient.socketSend(proxySock,
+ unbufProxyOut, unbufProxyIn, keyFactory, blockToken, proxySource);
+ unbufProxyOut = saslStreams.out;
+ unbufProxyIn = saslStreams.in;
proxyOut = new DataOutputStream(new BufferedOutputStream(unbufProxyOut,
HdfsConstants.SMALL_BUFFER_SIZE));
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java
index c9e548b4d06..b6fe4c63ec5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.server.common.JspHelper;
@@ -510,7 +511,7 @@ public class DatanodeJspHelper {
JspHelper.streamBlockInAscii(new InetSocketAddress(req.getServerName(),
datanodePort), bpid, blockId, blockToken, genStamp, blockSize,
startOffset, chunkSizeToView, out, conf, dfs.getConf(),
- dfs.getDataEncryptionKey());
+ dfs, getSaslDataTransferClient(req));
} catch (Exception e) {
out.print(e);
}
@@ -669,7 +670,7 @@ public class DatanodeJspHelper {
out.print("");
dfs.close();
}
@@ -702,4 +703,16 @@ public class DatanodeJspHelper {
return sb.toString();
}
+ /**
+ * Gets the {@link SaslDataTransferClient} from the {@link DataNode} attached
+ * to the servlet context.
+ *
+ * @return SaslDataTransferClient from DataNode
+ */
+ private static SaslDataTransferClient getSaslDataTransferClient(
+ HttpServletRequest req) {
+ DataNode dataNode = (DataNode)req.getSession().getServletContext()
+ .getAttribute("datanode");
+ return dataNode.saslClient;
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
index 9a66505436b..542e60e4016 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
@@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY;
+
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
@@ -47,6 +50,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.RemotePeerFactory;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.net.TcpPeerServer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -55,6 +59,12 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
+import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus;
import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas;
@@ -65,6 +75,7 @@ import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Time;
import com.google.common.annotations.VisibleForTesting;
@@ -92,7 +103,7 @@ import com.google.common.annotations.VisibleForTesting;
* factors of each file.
*/
@InterfaceAudience.Private
-public class NamenodeFsck {
+public class NamenodeFsck implements DataEncryptionKeyFactory {
public static final Log LOG = LogFactory.getLog(NameNode.class.getName());
// return string marking fsck status
@@ -149,6 +160,7 @@ public class NamenodeFsck {
private List snapshottableDirs = null;
private final BlockPlacementPolicy bpPolicy;
+ private final SaslDataTransferClient saslClient;
/**
* Filesystem checker.
@@ -175,6 +187,12 @@ public class NamenodeFsck {
networktopology,
namenode.getNamesystem().getBlockManager().getDatanodeManager()
.getHost2DatanodeMap());
+ this.saslClient = new SaslDataTransferClient(
+ DataTransferSaslUtil.getSaslPropertiesResolver(conf),
+ TrustedChannelResolver.getInstance(conf),
+ conf.getBoolean(
+ IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
+ IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT));
for (Iterator it = pmap.keySet().iterator(); it.hasNext();) {
String key = it.next();
@@ -616,15 +634,16 @@ public class NamenodeFsck {
setConfiguration(namenode.conf).
setRemotePeerFactory(new RemotePeerFactory() {
@Override
- public Peer newConnectedPeer(InetSocketAddress addr)
+ public Peer newConnectedPeer(InetSocketAddress addr,
+ Token blockToken, DatanodeID datanodeId)
throws IOException {
Peer peer = null;
Socket s = NetUtils.getDefaultSocketFactory(conf).createSocket();
try {
s.connect(addr, HdfsServerConstants.READ_TIMEOUT);
s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
- peer = TcpPeerServer.peerFromSocketAndKey(s, namenode.getRpcServer().
- getDataEncryptionKey());
+ peer = TcpPeerServer.peerFromSocketAndKey(saslClient, s,
+ NamenodeFsck.this, blockToken, datanodeId);
} finally {
if (peer == null) {
IOUtils.closeQuietly(s);
@@ -663,7 +682,12 @@ public class NamenodeFsck {
throw new Exception("Could not copy block data for " + lblock.getBlock());
}
}
-
+
+ @Override
+ public DataEncryptionKey newDataEncryptionKey() throws IOException {
+ return namenode.getRpcServer().getDataEncryptionKey();
+ }
+
/*
* XXX (ab) See comment above for copyBlock().
*
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 1a03da11785..6cca87aca71 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -1451,6 +1451,37 @@
+
+ dfs.data.transfer.protection
+
+
+ A comma-separated list of SASL protection values used for secured
+ connections to the DataNode when reading or writing block data. Possible
+ values are authentication, integrity and privacy. authentication means
+ authentication only and no integrity or privacy; integrity implies
+ authentication and integrity are enabled; and privacy implies all of
+ authentication, integrity and privacy are enabled. If
+ dfs.encrypt.data.transfer is set to true, then it supersedes the setting for
+ dfs.data.transfer.protection and enforces that all connections must use a
+ specialized encrypted SASL handshake. This property is ignored for
+ connections to a DataNode listening on a privileged port. In this case, it
+ is assumed that the use of a privileged port establishes sufficient trust.
+
+
+
+
+ dfs.data.transfer.saslproperties.resolver.class
+
+
+ SaslPropertiesResolver used to resolve the QOP used for a connection to the
+ DataNode when reading or writing block data. If not specified, the full set
+ of values specified in dfs.data.transfer.protection is used while
+ determining the QOP used for the connection. If a class is specified, then
+ the QOP values returned by the class will be used while determining the QOP
+ used for the connection.
+
+
+
dfs.datanode.hdfs-blocks-metadata.enabled
false
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
index 0de3ab2e915..88b7f37dcce 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
@@ -33,9 +33,11 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.net.TcpPeerServer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
@@ -48,6 +50,7 @@ import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.token.Token;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
@@ -192,7 +195,8 @@ public class BlockReaderTestUtil {
setAllowShortCircuitLocalReads(true).
setRemotePeerFactory(new RemotePeerFactory() {
@Override
- public Peer newConnectedPeer(InetSocketAddress addr)
+ public Peer newConnectedPeer(InetSocketAddress addr,
+ Token blockToken, DatanodeID datanodeId)
throws IOException {
Peer peer = null;
Socket sock = NetUtils.
@@ -251,4 +255,4 @@ public class BlockReaderTestUtil {
LogManager.getLogger(DataNode.class.getName()).setLevel(
Level.TRACE);
}
-}
\ No newline at end of file
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
index 98eb5f7e4b2..5b1c6e35aa9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
@@ -19,12 +19,15 @@ package org.apache.hadoop.hdfs;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_DEFAULT;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY;
@@ -1310,15 +1313,42 @@ public class MiniDFSCluster {
}
SecureResources secureResources = null;
- if (UserGroupInformation.isSecurityEnabled()) {
+ if (UserGroupInformation.isSecurityEnabled() &&
+ conf.get(DFS_DATA_TRANSFER_PROTECTION_KEY) == null) {
try {
secureResources = SecureDataNodeStarter.getSecureResources(dnConf);
} catch (Exception ex) {
ex.printStackTrace();
}
}
- DataNode dn = DataNode.instantiateDataNode(dnArgs, dnConf,
- secureResources);
+ final int maxRetriesOnSasl = conf.getInt(
+ IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY,
+ IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_DEFAULT);
+ int numRetries = 0;
+ DataNode dn = null;
+ while (true) {
+ try {
+ dn = DataNode.instantiateDataNode(dnArgs, dnConf,
+ secureResources);
+ break;
+ } catch (IOException e) {
+ // Work around issue testing security where rapidly starting multiple
+ // DataNodes using the same principal gets rejected by the KDC as a
+ // replay attack.
+ if (UserGroupInformation.isSecurityEnabled() &&
+ numRetries < maxRetriesOnSasl) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ ++numRetries;
+ continue;
+ }
+ throw e;
+ }
+ }
if(dn == null)
throw new IOException("Cannot start DataNode in "
+ dnConf.get(DFS_DATANODE_DATA_DIR_KEY));
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferTestCase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferTestCase.java
new file mode 100644
index 00000000000..6f2b3aa88f5
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferTestCase.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY;
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.util.Properties;
+
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.http.HttpConfig;
+import org.apache.hadoop.minikdc.MiniKdc;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+public abstract class SaslDataTransferTestCase {
+
+ private static File baseDir;
+ private static String hdfsPrincipal;
+ private static MiniKdc kdc;
+ private static String keytab;
+ private static String spnegoPrincipal;
+
+ @BeforeClass
+ public static void initKdc() throws Exception {
+ baseDir = new File(System.getProperty("test.build.dir", "target/test-dir"),
+ SaslDataTransferTestCase.class.getSimpleName());
+ FileUtil.fullyDelete(baseDir);
+ assertTrue(baseDir.mkdirs());
+
+ Properties kdcConf = MiniKdc.createConf();
+ kdc = new MiniKdc(kdcConf, baseDir);
+ kdc.start();
+
+ String userName = UserGroupInformation.getLoginUser().getShortUserName();
+ File keytabFile = new File(baseDir, userName + ".keytab");
+ keytab = keytabFile.getAbsolutePath();
+ kdc.createPrincipal(keytabFile, userName + "/localhost", "HTTP/localhost");
+ hdfsPrincipal = userName + "/localhost@" + kdc.getRealm();
+ spnegoPrincipal = "HTTP/localhost@" + kdc.getRealm();
+ }
+
+ @AfterClass
+ public static void shutdownKdc() {
+ if (kdc != null) {
+ kdc.stop();
+ }
+ FileUtil.fullyDelete(baseDir);
+ }
+
+ /**
+ * Creates configuration for starting a secure cluster.
+ *
+ * @param dataTransferProtection supported QOPs
+ * @return configuration for starting a secure cluster
+ * @throws Exception if there is any failure
+ */
+ protected HdfsConfiguration createSecureConfig(
+ String dataTransferProtection) throws Exception {
+ HdfsConfiguration conf = new HdfsConfiguration();
+ SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS, conf);
+ conf.set(DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, hdfsPrincipal);
+ conf.set(DFS_NAMENODE_KEYTAB_FILE_KEY, keytab);
+ conf.set(DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, hdfsPrincipal);
+ conf.set(DFS_DATANODE_KEYTAB_FILE_KEY, keytab);
+ conf.set(DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, spnegoPrincipal);
+ conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
+ conf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, dataTransferProtection);
+ conf.set(DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name());
+ conf.set(DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0");
+ conf.set(DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0");
+ conf.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY, 10);
+
+ String keystoresDir = baseDir.getAbsolutePath();
+ String sslConfDir = KeyStoreTestUtil.getClasspathDir(this.getClass());
+ KeyStoreTestUtil.setupSSLConfig(keystoresDir, sslConfDir, conf, false);
+ return conf;
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/TestSaslDataTransfer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/TestSaslDataTransfer.java
new file mode 100644
index 00000000000..7602f44b0b0
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/TestSaslDataTransfer.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystemTestHelper;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
+import org.junit.After;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class TestSaslDataTransfer extends SaslDataTransferTestCase {
+
+ private static final int BLOCK_SIZE = 4096;
+ private static final int BUFFER_SIZE= 1024;
+ private static final int NUM_BLOCKS = 3;
+ private static final Path PATH = new Path("/file1");
+ private static final short REPLICATION = 3;
+
+ private MiniDFSCluster cluster;
+ private FileSystem fs;
+
+ @Rule
+ public ExpectedException exception = ExpectedException.none();
+
+ @After
+ public void shutdown() {
+ IOUtils.cleanup(null, fs);
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testAuthentication() throws Exception {
+ HdfsConfiguration clusterConf = createSecureConfig(
+ "authentication,integrity,privacy");
+ startCluster(clusterConf);
+ HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf);
+ clientConf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "authentication");
+ doTest(clientConf);
+ }
+
+ @Test
+ public void testIntegrity() throws Exception {
+ HdfsConfiguration clusterConf = createSecureConfig(
+ "authentication,integrity,privacy");
+ startCluster(clusterConf);
+ HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf);
+ clientConf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "integrity");
+ doTest(clientConf);
+ }
+
+ @Test
+ public void testPrivacy() throws Exception {
+ HdfsConfiguration clusterConf = createSecureConfig(
+ "authentication,integrity,privacy");
+ startCluster(clusterConf);
+ HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf);
+ clientConf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "privacy");
+ doTest(clientConf);
+ }
+
+ @Test
+ public void testClientAndServerDoNotHaveCommonQop() throws Exception {
+ HdfsConfiguration clusterConf = createSecureConfig("privacy");
+ startCluster(clusterConf);
+ HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf);
+ clientConf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "authentication");
+ exception.expect(IOException.class);
+ exception.expectMessage("could only be replicated to 0 nodes");
+ doTest(clientConf);
+ }
+
+ @Test
+ public void testClientSaslNoServerSasl() throws Exception {
+ HdfsConfiguration clusterConf = createSecureConfig("");
+ startCluster(clusterConf);
+ HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf);
+ clientConf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "authentication");
+ exception.expect(IOException.class);
+ exception.expectMessage("could only be replicated to 0 nodes");
+ doTest(clientConf);
+ }
+
+ @Test
+ public void testServerSaslNoClientSasl() throws Exception {
+ HdfsConfiguration clusterConf = createSecureConfig(
+ "authentication,integrity,privacy");
+ startCluster(clusterConf);
+ HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf);
+ clientConf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "");
+ exception.expect(IOException.class);
+ exception.expectMessage("could only be replicated to 0 nodes");
+ doTest(clientConf);
+ }
+
+ /**
+ * Tests DataTransferProtocol with the given client configuration.
+ *
+ * @param conf client configuration
+ * @throws IOException if there is an I/O error
+ */
+ private void doTest(HdfsConfiguration conf) throws IOException {
+ fs = FileSystem.get(cluster.getURI(), conf);
+ FileSystemTestHelper.createFile(fs, PATH, NUM_BLOCKS, BLOCK_SIZE);
+ assertArrayEquals(FileSystemTestHelper.getFileData(NUM_BLOCKS, BLOCK_SIZE),
+ DFSTestUtil.readFile(fs, PATH).getBytes("UTF-8"));
+ BlockLocation[] blockLocations = fs.getFileBlockLocations(PATH, 0,
+ Long.MAX_VALUE);
+ assertNotNull(blockLocations);
+ assertEquals(NUM_BLOCKS, blockLocations.length);
+ for (BlockLocation blockLocation: blockLocations) {
+ assertNotNull(blockLocation.getHosts());
+ assertEquals(3, blockLocation.getHosts().length);
+ }
+ }
+
+ /**
+ * Starts a cluster with the given configuration.
+ *
+ * @param conf cluster configuration
+ * @throws IOException if there is an I/O error
+ */
+ private void startCluster(HdfsConfiguration conf) throws IOException {
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+ cluster.waitActive();
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithSaslDataTransfer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithSaslDataTransfer.java
new file mode 100644
index 00000000000..b579c8937bd
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithSaslDataTransfer.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.balancer;
+
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferTestCase;
+import org.junit.Test;
+
+public class TestBalancerWithSaslDataTransfer extends SaslDataTransferTestCase {
+
+ private static final TestBalancer TEST_BALANCER = new TestBalancer();
+
+ @Test
+ public void testBalancer0Authentication() throws Exception {
+ TEST_BALANCER.testBalancer0Internal(createSecureConfig("authentication"));
+ }
+
+ @Test
+ public void testBalancer0Integrity() throws Exception {
+ TEST_BALANCER.testBalancer0Internal(createSecureConfig("integrity"));
+ }
+
+ @Test
+ public void testBalancer0Privacy() throws Exception {
+ TEST_BALANCER.testBalancer0Internal(createSecureConfig("privacy"));
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
index 0c3c5806e8c..b15cb380976 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.RemotePeerFactory;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.net.TcpPeerServer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -160,7 +161,8 @@ public class TestBlockTokenWithDFS {
setConfiguration(conf).
setRemotePeerFactory(new RemotePeerFactory() {
@Override
- public Peer newConnectedPeer(InetSocketAddress addr)
+ public Peer newConnectedPeer(InetSocketAddress addr,
+ Token blockToken, DatanodeID datanodeId)
throws IOException {
Peer peer = null;
Socket sock = NetUtils.getDefaultSocketFactory(conf).createSocket();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
index 3644c33a76d..38403eb8258 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
@@ -46,9 +46,11 @@ import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
@@ -58,6 +60,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.token.Token;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -307,7 +310,8 @@ public class TestDataNodeVolumeFailure {
setConfiguration(conf).
setRemotePeerFactory(new RemotePeerFactory() {
@Override
- public Peer newConnectedPeer(InetSocketAddress addr)
+ public Peer newConnectedPeer(InetSocketAddress addr,
+ Token blockToken, DatanodeID datanodeId)
throws IOException {
Peer peer = null;
Socket sock = NetUtils.getDefaultSocketFactory(conf).createSocket();