HDFS-3637. Add support for encrypting the DataTransferProtocol. Contributed by Aaron T. Myers.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1370360 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Aaron Myers 2012-08-07 16:46:03 +00:00
parent 88593c443d
commit 9c8485f099
53 changed files with 1973 additions and 241 deletions

View File

@ -620,7 +620,8 @@ public abstract class FileSystem extends Configured implements Closeable {
conf.getInt("io.bytes.per.checksum", 512),
64 * 1024,
getDefaultReplication(),
conf.getInt("io.file.buffer.size", 4096));
conf.getInt("io.file.buffer.size", 4096),
false);
}
/**

View File

@ -48,17 +48,20 @@ public class FsServerDefaults implements Writable {
private int writePacketSize;
private short replication;
private int fileBufferSize;
private boolean encryptDataTransfer;
public FsServerDefaults() {
}
public FsServerDefaults(long blockSize, int bytesPerChecksum,
int writePacketSize, short replication, int fileBufferSize) {
int writePacketSize, short replication, int fileBufferSize,
boolean encryptDataTransfer) {
this.blockSize = blockSize;
this.bytesPerChecksum = bytesPerChecksum;
this.writePacketSize = writePacketSize;
this.replication = replication;
this.fileBufferSize = fileBufferSize;
this.encryptDataTransfer = encryptDataTransfer;
}
public long getBlockSize() {
@ -80,6 +83,10 @@ public class FsServerDefaults implements Writable {
public int getFileBufferSize() {
return fileBufferSize;
}
public boolean getEncryptDataTransfer() {
return encryptDataTransfer;
}
// /////////////////////////////////////////
// Writable

View File

@ -44,6 +44,7 @@ public class FtpConfigKeys extends CommonConfigurationKeys {
public static final String CLIENT_WRITE_PACKET_SIZE_KEY =
"ftp.client-write-packet-size";
public static final int CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024;
public static final boolean ENCRYPT_DATA_TRANSFER_DEFAULT = false;
protected static FsServerDefaults getServerDefaults() throws IOException {
return new FsServerDefaults(
@ -51,7 +52,8 @@ public class FtpConfigKeys extends CommonConfigurationKeys {
BYTES_PER_CHECKSUM_DEFAULT,
CLIENT_WRITE_PACKET_SIZE_DEFAULT,
REPLICATION_DEFAULT,
STREAM_BUFFER_SIZE_DEFAULT);
STREAM_BUFFER_SIZE_DEFAULT,
ENCRYPT_DATA_TRANSFER_DEFAULT);
}
}

View File

@ -43,6 +43,7 @@ public class LocalConfigKeys extends CommonConfigurationKeys {
public static final String CLIENT_WRITE_PACKET_SIZE_KEY =
"file.client-write-packet-size";
public static final int CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024;
public static final boolean ENCRYPT_DATA_TRANSFER_DEFAULT = false;
public static FsServerDefaults getServerDefaults() throws IOException {
return new FsServerDefaults(
@ -50,7 +51,8 @@ public class LocalConfigKeys extends CommonConfigurationKeys {
BYTES_PER_CHECKSUM_DEFAULT,
CLIENT_WRITE_PACKET_SIZE_DEFAULT,
REPLICATION_DEFAULT,
STREAM_BUFFER_SIZE_DEFAULT);
STREAM_BUFFER_SIZE_DEFAULT,
ENCRYPT_DATA_TRANSFER_DEFAULT);
}
}

View File

@ -22,6 +22,8 @@ import java.io.DataInputStream;
import java.io.EOFException;
import java.io.InputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslClient;
@ -42,7 +44,7 @@ import org.apache.hadoop.classification.InterfaceStability;
*/
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
@InterfaceStability.Evolving
public class SaslInputStream extends InputStream {
public class SaslInputStream extends InputStream implements ReadableByteChannel {
public static final Log LOG = LogFactory.getLog(SaslInputStream.class);
private final DataInputStream inStream;
@ -65,6 +67,8 @@ public class SaslInputStream extends InputStream {
private int ostart = 0;
// position of the last "new" byte
private int ofinish = 0;
// whether or not this stream is open
private boolean isOpen = true;
private static int unsignedBytesToInt(byte[] buf) {
if (buf.length != 4) {
@ -330,6 +334,7 @@ public class SaslInputStream extends InputStream {
ostart = 0;
ofinish = 0;
inStream.close();
isOpen = false;
}
/**
@ -342,4 +347,28 @@ public class SaslInputStream extends InputStream {
public boolean markSupported() {
return false;
}
@Override
public boolean isOpen() {
return isOpen;
}
@Override
public int read(ByteBuffer dst) throws IOException {
int bytesRead = 0;
if (dst.hasArray()) {
bytesRead = read(dst.array(), dst.arrayOffset() + dst.position(),
dst.remaining());
if (bytesRead > -1) {
dst.position(dst.position() + bytesRead);
}
} else {
byte[] buf = new byte[dst.remaining()];
bytesRead = read(buf);
if (bytesRead > -1) {
dst.put(buf, 0, bytesRead);
}
}
return bytesRead;
}
}

View File

@ -21,6 +21,8 @@ Release 2.0.1-alpha - UNRELEASED
HDFS-3513. HttpFS should cache filesystems. (tucu)
HDFS-3637. Add support for encrypting the DataTransferProtocol. (atm)
IMPROVEMENTS
HDFS-3390. DFSAdmin should print full stack traces of errors when DEBUG

View File

@ -21,6 +21,7 @@ import java.io.IOException;
import java.net.Socket;
import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
/**
* A BlockReader is responsible for reading a single block
@ -71,4 +72,8 @@ public interface BlockReader extends ByteBufferReadable {
*/
boolean hasSentStatusCode();
/**
* @return a reference to the streams this block reader is using.
*/
IOStreamPair getStreams();
}

View File

@ -25,7 +25,12 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSClient.Conf;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;
@ -41,12 +46,13 @@ public class BlockReaderFactory {
Configuration conf,
Socket sock, String file,
ExtendedBlock block, Token<BlockTokenIdentifier> blockToken,
long startOffset, long len) throws IOException {
long startOffset, long len, DataEncryptionKey encryptionKey)
throws IOException {
int bufferSize = conf.getInt(DFSConfigKeys.IO_FILE_BUFFER_SIZE_KEY,
DFSConfigKeys.IO_FILE_BUFFER_SIZE_DEFAULT);
return newBlockReader(new Conf(conf),
sock, file, block, blockToken, startOffset,
len, bufferSize, true, "");
len, bufferSize, true, "", encryptionKey, null);
}
/**
@ -73,14 +79,32 @@ public class BlockReaderFactory {
Token<BlockTokenIdentifier> blockToken,
long startOffset, long len,
int bufferSize, boolean verifyChecksum,
String clientName)
String clientName,
DataEncryptionKey encryptionKey,
IOStreamPair ioStreams)
throws IOException {
if (conf.useLegacyBlockReader) {
if (encryptionKey != null) {
throw new RuntimeException("Encryption is not supported with the legacy block reader.");
}
return RemoteBlockReader.newBlockReader(
sock, file, block, blockToken, startOffset, len, bufferSize, verifyChecksum, clientName);
} else {
if (ioStreams == null) {
ioStreams = new IOStreamPair(NetUtils.getInputStream(sock),
NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT));
if (encryptionKey != null) {
IOStreamPair encryptedStreams =
DataTransferEncryptor.getEncryptedStreams(
ioStreams.out, ioStreams.in, encryptionKey);
ioStreams = encryptedStreams;
}
}
return RemoteBlockReader2.newBlockReader(
sock, file, block, blockToken, startOffset, len, bufferSize, verifyChecksum, clientName);
sock, file, block, blockToken, startOffset, len, bufferSize,
verifyChecksum, clientName, encryptionKey, ioStreams);
}
}

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.util.DirectBufferPool;
@ -681,4 +682,9 @@ class BlockReaderLocal implements BlockReader {
public boolean hasSentStatusCode() {
return false;
}
@Override
public IOStreamPair getStreams() {
return null;
}
}

View File

@ -45,6 +45,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKRE
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
@ -53,6 +55,7 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
@ -109,12 +112,15 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
@ -182,6 +188,7 @@ public class DFSClient implements java.io.Closeable {
final Conf dfsClientConf;
private Random r = new Random();
private SocketAddress[] localInterfaceAddrs;
private DataEncryptionKey encryptionKey;
/**
* DFSClient configuration
@ -351,9 +358,6 @@ public class DFSClient implements java.io.Closeable {
this.clientName = "DFSClient_" + dfsClientConf.taskId + "_" +
DFSUtil.getRandom().nextInt() + "_" + Thread.currentThread().getId();
this.socketCache = new SocketCache(dfsClientConf.socketCacheCapacity);
if (rpcNamenode != null) {
// This case is used for testing.
Preconditions.checkArgument(nameNodeUri == null);
@ -383,6 +387,8 @@ public class DFSClient implements java.io.Closeable {
Joiner.on(',').join(localInterfaces)+ "] with addresses [" +
Joiner.on(',').join(localInterfaceAddrs) + "]");
}
this.socketCache = new SocketCache(dfsClientConf.socketCacheCapacity);
}
/**
@ -1457,7 +1463,44 @@ public class DFSClient implements java.io.Closeable {
*/
public MD5MD5CRC32FileChecksum getFileChecksum(String src) throws IOException {
checkOpen();
return getFileChecksum(src, namenode, socketFactory, dfsClientConf.socketTimeout);
return getFileChecksum(src, namenode, socketFactory,
dfsClientConf.socketTimeout, getDataEncryptionKey());
}
@InterfaceAudience.Private
public void clearDataEncryptionKey() {
LOG.debug("Clearing encryption key");
synchronized (this) {
encryptionKey = null;
}
}
/**
* @return true if data sent between this client and DNs should be encrypted,
* false otherwise.
* @throws IOException in the event of error communicating with the NN
*/
boolean shouldEncryptData() throws IOException {
FsServerDefaults d = getServerDefaults();
return d == null ? false : d.getEncryptDataTransfer();
}
@InterfaceAudience.Private
public DataEncryptionKey getDataEncryptionKey()
throws IOException {
if (shouldEncryptData()) {
synchronized (this) {
if (encryptionKey == null ||
(encryptionKey != null &&
encryptionKey.expiryDate < Time.now())) {
LOG.debug("Getting new encryption token from NN");
encryptionKey = namenode.getDataEncryptionKey();
}
return encryptionKey;
}
} else {
return null;
}
}
/**
@ -1466,8 +1509,8 @@ public class DFSClient implements java.io.Closeable {
* @return The checksum
*/
public static MD5MD5CRC32FileChecksum getFileChecksum(String src,
ClientProtocol namenode, SocketFactory socketFactory, int socketTimeout
) throws IOException {
ClientProtocol namenode, SocketFactory socketFactory, int socketTimeout,
DataEncryptionKey encryptionKey) throws IOException {
//get all block locations
LocatedBlocks blockLocations = callGetBlockLocations(namenode, src, 0, Long.MAX_VALUE);
if (null == blockLocations) {
@ -1510,10 +1553,18 @@ public class DFSClient implements java.io.Closeable {
timeout);
sock.setSoTimeout(timeout);
out = new DataOutputStream(
new BufferedOutputStream(NetUtils.getOutputStream(sock),
HdfsConstants.SMALL_BUFFER_SIZE));
in = new DataInputStream(NetUtils.getInputStream(sock));
OutputStream unbufOut = NetUtils.getOutputStream(sock);
InputStream unbufIn = NetUtils.getInputStream(sock);
if (encryptionKey != null) {
IOStreamPair encryptedStreams =
DataTransferEncryptor.getEncryptedStreams(
unbufOut, unbufIn, encryptionKey);
unbufOut = encryptedStreams.out;
unbufIn = encryptedStreams.in;
}
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
HdfsConstants.SMALL_BUFFER_SIZE));
in = new DataInputStream(unbufIn);
if (LOG.isDebugEnabled()) {
LOG.debug("write to " + datanodes[j] + ": "

View File

@ -367,4 +367,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final boolean DFS_HA_AUTO_FAILOVER_ENABLED_DEFAULT = false;
public static final String DFS_HA_ZKFC_PORT_KEY = "dfs.ha.zkfc.port";
public static final int DFS_HA_ZKFC_PORT_DEFAULT = 8019;
// Security-related configs
public static final String DFS_ENCRYPT_DATA_TRANSFER_KEY = "dfs.encrypt.data.transfer";
public static final boolean DFS_ENCRYPT_DATA_TRANSFER_DEFAULT = false;
public static final String DFS_DATA_ENCRYPTION_ALGORITHM_KEY = "dfs.encrypt.data.transfer.algorithm";
}

View File

@ -37,11 +37,14 @@ import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.hdfs.SocketCache.SocketAndStreams;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
@ -425,6 +428,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
//
DatanodeInfo chosenNode = null;
int refetchToken = 1; // only need to get a new access token once
int refetchEncryptionKey = 1; // only need to get a new encryption key once
boolean connectFailedOnce = false;
@ -452,7 +456,14 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
}
return chosenNode;
} catch (IOException ex) {
if (ex instanceof InvalidBlockTokenException && refetchToken > 0) {
if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
DFSClient.LOG.info("Will fetch a new encryption key and retry, "
+ "encryption key was invalid when connecting to " + targetAddr
+ " : " + ex);
// The encryption key used is invalid.
refetchEncryptionKey--;
dfsClient.clearDataEncryptionKey();
} else if (ex instanceof InvalidBlockTokenException && refetchToken > 0) {
DFSClient.LOG.info("Will fetch a new access token and retry, "
+ "access token was invalid when connecting to " + targetAddr
+ " : " + ex);
@ -754,6 +765,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
// Connect to best DataNode for desired Block, with potential offset
//
int refetchToken = 1; // only need to get a new access token once
int refetchEncryptionKey = 1; // only need to get a new encryption key once
while (true) {
// cached block locations may have been updated by chooseDataNode()
@ -789,7 +801,14 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
dfsClient.disableShortCircuit();
continue;
} catch (IOException e) {
if (e instanceof InvalidBlockTokenException && refetchToken > 0) {
if (e instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
DFSClient.LOG.info("Will fetch a new encryption key and retry, "
+ "encryption key was invalid when connecting to " + targetAddr
+ " : " + e);
// The encryption key used is invalid.
refetchEncryptionKey--;
dfsClient.clearDataEncryptionKey();
} else if (e instanceof InvalidBlockTokenException && refetchToken > 0) {
DFSClient.LOG.info("Will get a new access token and retry, "
+ "access token was invalid when connecting to " + targetAddr
+ " : " + e);
@ -818,8 +837,9 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
*/
private void closeBlockReader(BlockReader reader) throws IOException {
if (reader.hasSentStatusCode()) {
IOStreamPair ioStreams = reader.getStreams();
Socket oldSock = reader.takeSocket();
socketCache.put(oldSock);
socketCache.put(oldSock, ioStreams);
}
reader.close();
}
@ -864,14 +884,15 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
// Allow retry since there is no way of knowing whether the cached socket
// is good until we actually use it.
for (int retries = 0; retries <= nCachedConnRetry && fromCache; ++retries) {
Socket sock = null;
SocketAndStreams sockAndStreams = null;
// Don't use the cache on the last attempt - it's possible that there
// are arbitrarily many unusable sockets in the cache, but we don't
// want to fail the read.
if (retries < nCachedConnRetry) {
sock = socketCache.get(dnAddr);
sockAndStreams = socketCache.get(dnAddr);
}
if (sock == null) {
Socket sock;
if (sockAndStreams == null) {
fromCache = false;
sock = dfsClient.socketFactory.createSocket();
@ -895,6 +916,8 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
dfsClient.getRandomLocalInterfaceAddr(),
dfsClient.getConf().socketTimeout);
sock.setSoTimeout(dfsClient.getConf().socketTimeout);
} else {
sock = sockAndStreams.sock;
}
try {
@ -905,12 +928,18 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
blockToken,
startOffset, len,
bufferSize, verifyChecksum,
clientName);
clientName,
dfsClient.getDataEncryptionKey(),
sockAndStreams == null ? null : sockAndStreams.ioStreams);
return reader;
} catch (IOException ex) {
// Our socket is no good.
DFSClient.LOG.debug("Error making BlockReader. Closing stale " + sock, ex);
sock.close();
if (sockAndStreams != null) {
sockAndStreams.close();
} else {
sock.close();
}
err = ex;
}
}

View File

@ -24,7 +24,9 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.BufferOverflowException;
@ -56,6 +58,9 @@ import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
@ -867,16 +872,26 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
try {
sock = createSocketForPipeline(src, 2, dfsClient);
final long writeTimeout = dfsClient.getDatanodeWriteTimeout(2);
out = new DataOutputStream(new BufferedOutputStream(
NetUtils.getOutputStream(sock, writeTimeout),
OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
InputStream unbufIn = NetUtils.getInputStream(sock);
if (dfsClient.shouldEncryptData()) {
IOStreamPair encryptedStreams =
DataTransferEncryptor.getEncryptedStreams(
unbufOut, unbufIn, dfsClient.getDataEncryptionKey());
unbufOut = encryptedStreams.out;
unbufIn = encryptedStreams.in;
}
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
HdfsConstants.SMALL_BUFFER_SIZE));
in = new DataInputStream(unbufIn);
//send the TRANSFER_BLOCK request
new Sender(out).transferBlock(block, blockToken, dfsClient.clientName,
targets);
out.flush();
//ack
in = new DataInputStream(NetUtils.getInputStream(sock));
BlockOpResponseProto response =
BlockOpResponseProto.parseFrom(HdfsProtoUtil.vintPrefixed(in));
if (SUCCESS != response.getStatus()) {
@ -1034,77 +1049,98 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
// persist blocks on namenode on next flush
persistBlocks.set(true);
boolean result = false;
DataOutputStream out = null;
try {
assert null == s : "Previous socket unclosed";
s = createSocketForPipeline(nodes[0], nodes.length, dfsClient);
long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length);
//
// Xmit header info to datanode
//
out = new DataOutputStream(new BufferedOutputStream(
NetUtils.getOutputStream(s, writeTimeout),
HdfsConstants.SMALL_BUFFER_SIZE));
assert null == blockReplyStream : "Previous blockReplyStream unclosed";
blockReplyStream = new DataInputStream(NetUtils.getInputStream(s));
// send the request
new Sender(out).writeBlock(block, accessToken, dfsClient.clientName,
nodes, null, recoveryFlag? stage.getRecoveryStage() : stage,
nodes.length, block.getNumBytes(), bytesSent, newGS, checksum);
// receive ack for connect
BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
HdfsProtoUtil.vintPrefixed(blockReplyStream));
pipelineStatus = resp.getStatus();
firstBadLink = resp.getFirstBadLink();
if (pipelineStatus != SUCCESS) {
if (pipelineStatus == Status.ERROR_ACCESS_TOKEN) {
throw new InvalidBlockTokenException(
"Got access token error for connect ack with firstBadLink as "
+ firstBadLink);
} else {
throw new IOException("Bad connect ack with firstBadLink as "
+ firstBadLink);
int refetchEncryptionKey = 1;
while (true) {
boolean result = false;
DataOutputStream out = null;
try {
assert null == s : "Previous socket unclosed";
assert null == blockReplyStream : "Previous blockReplyStream unclosed";
s = createSocketForPipeline(nodes[0], nodes.length, dfsClient);
long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length);
OutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout);
InputStream unbufIn = NetUtils.getInputStream(s);
if (dfsClient.shouldEncryptData()) {
IOStreamPair encryptedStreams =
DataTransferEncryptor.getEncryptedStreams(unbufOut,
unbufIn, dfsClient.getDataEncryptionKey());
unbufOut = encryptedStreams.out;
unbufIn = encryptedStreams.in;
}
}
assert null == blockStream : "Previous blockStream unclosed";
blockStream = out;
result = true; // success
} catch (IOException ie) {
DFSClient.LOG.info("Exception in createBlockOutputStream", ie);
// find the datanode that matches
if (firstBadLink.length() != 0) {
for (int i = 0; i < nodes.length; i++) {
if (nodes[i].getXferAddr().equals(firstBadLink)) {
errorIndex = i;
break;
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
HdfsConstants.SMALL_BUFFER_SIZE));
blockReplyStream = new DataInputStream(unbufIn);
//
// Xmit header info to datanode
//
// send the request
new Sender(out).writeBlock(block, accessToken, dfsClient.clientName,
nodes, null, recoveryFlag? stage.getRecoveryStage() : stage,
nodes.length, block.getNumBytes(), bytesSent, newGS, checksum);
// receive ack for connect
BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
HdfsProtoUtil.vintPrefixed(blockReplyStream));
pipelineStatus = resp.getStatus();
firstBadLink = resp.getFirstBadLink();
if (pipelineStatus != SUCCESS) {
if (pipelineStatus == Status.ERROR_ACCESS_TOKEN) {
throw new InvalidBlockTokenException(
"Got access token error for connect ack with firstBadLink as "
+ firstBadLink);
} else {
throw new IOException("Bad connect ack with firstBadLink as "
+ firstBadLink);
}
}
} else {
errorIndex = 0;
}
hasError = true;
setLastException(ie);
result = false; // error
} finally {
if (!result) {
IOUtils.closeSocket(s);
s = null;
IOUtils.closeStream(out);
out = null;
IOUtils.closeStream(blockReplyStream);
blockReplyStream = null;
assert null == blockStream : "Previous blockStream unclosed";
blockStream = out;
result = true; // success
} catch (IOException ie) {
DFSClient.LOG.info("Exception in createBlockOutputStream", ie);
if (ie instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
DFSClient.LOG.info("Will fetch a new encryption key and retry, "
+ "encryption key was invalid when connecting to "
+ nodes[0].getXferAddr() + " : " + ie);
// The encryption key used is invalid.
refetchEncryptionKey--;
dfsClient.clearDataEncryptionKey();
// Don't close the socket/exclude this node just yet. Try again with
// a new encryption key.
continue;
}
// find the datanode that matches
if (firstBadLink.length() != 0) {
for (int i = 0; i < nodes.length; i++) {
if (nodes[i].getXferAddr().equals(firstBadLink)) {
errorIndex = i;
break;
}
}
} else {
errorIndex = 0;
}
hasError = true;
setLastException(ie);
result = false; // error
} finally {
if (!result) {
IOUtils.closeSocket(s);
s = null;
IOUtils.closeStream(out);
out = null;
IOUtils.closeStream(blockReplyStream);
blockReplyStream = null;
}
}
return result;
}
return result;
}
private LocatedBlock locateFollowingBlock(long start,

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FSInputChecker;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
@ -458,7 +459,9 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
void sendReadResult(Socket sock, Status statusCode) {
assert !sentStatusCode : "already sent status code to " + sock;
try {
RemoteBlockReader2.writeReadResult(sock, statusCode);
RemoteBlockReader2.writeReadResult(
NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT),
statusCode);
sentStatusCode = true;
} catch (IOException e) {
// It's ok not to be able to send this. But something is probably wrong.
@ -484,4 +487,11 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
throw new UnsupportedOperationException("readDirect unsupported in RemoteBlockReader");
}
@Override
public IOStreamPair getStreams() {
// This class doesn't support encryption, which is the only thing this
// method is used for. See HDFS-3637.
return null;
}
}

View File

@ -23,6 +23,7 @@ import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
@ -35,12 +36,15 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
@ -83,7 +87,9 @@ public class RemoteBlockReader2 implements BlockReader {
static final Log LOG = LogFactory.getLog(RemoteBlockReader2.class);
Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read.
Socket dnSock;
// for now just sending the status code (e.g. checksumOk) after the read.
private IOStreamPair ioStreams;
private final ReadableByteChannel in;
private DataChecksum checksum;
@ -206,9 +212,9 @@ public class RemoteBlockReader2 implements BlockReader {
if (bytesNeededToFinish <= 0) {
readTrailingEmptyPacket();
if (verifyChecksum) {
sendReadResult(dnSock, Status.CHECKSUM_OK);
sendReadResult(Status.CHECKSUM_OK);
} else {
sendReadResult(dnSock, Status.SUCCESS);
sendReadResult(Status.SUCCESS);
}
}
}
@ -292,9 +298,11 @@ public class RemoteBlockReader2 implements BlockReader {
protected RemoteBlockReader2(String file, String bpid, long blockId,
ReadableByteChannel in, DataChecksum checksum, boolean verifyChecksum,
long startOffset, long firstChunkOffset, long bytesToRead, Socket dnSock) {
long startOffset, long firstChunkOffset, long bytesToRead, Socket dnSock,
IOStreamPair ioStreams) {
// Path is used only for printing block and file information in debug
this.dnSock = dnSock;
this.ioStreams = ioStreams;
this.in = in;
this.checksum = checksum;
this.verifyChecksum = verifyChecksum;
@ -369,24 +377,23 @@ public class RemoteBlockReader2 implements BlockReader {
* closing our connection (which we will re-open), but won't affect
* data correctness.
*/
void sendReadResult(Socket sock, Status statusCode) {
assert !sentStatusCode : "already sent status code to " + sock;
void sendReadResult(Status statusCode) {
assert !sentStatusCode : "already sent status code to " + dnSock;
try {
writeReadResult(sock, statusCode);
writeReadResult(ioStreams.out, statusCode);
sentStatusCode = true;
} catch (IOException e) {
// It's ok not to be able to send this. But something is probably wrong.
LOG.info("Could not send read status (" + statusCode + ") to datanode " +
sock.getInetAddress() + ": " + e.getMessage());
dnSock.getInetAddress() + ": " + e.getMessage());
}
}
/**
* Serialize the actual read result on the wire.
*/
static void writeReadResult(Socket sock, Status statusCode)
static void writeReadResult(OutputStream out, Status statusCode)
throws IOException {
OutputStream out = NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT);
ClientReadStatusProto.newBuilder()
.setStatus(statusCode)
@ -434,25 +441,32 @@ public class RemoteBlockReader2 implements BlockReader {
* @param clientName Client name
* @return New BlockReader instance, or null on error.
*/
public static BlockReader newBlockReader( Socket sock, String file,
public static BlockReader newBlockReader(Socket sock, String file,
ExtendedBlock block,
Token<BlockTokenIdentifier> blockToken,
long startOffset, long len,
int bufferSize, boolean verifyChecksum,
String clientName)
String clientName,
DataEncryptionKey encryptionKey,
IOStreamPair ioStreams)
throws IOException {
ReadableByteChannel ch;
if (ioStreams.in instanceof SocketInputWrapper) {
ch = ((SocketInputWrapper)ioStreams.in).getReadableByteChannel();
} else {
ch = (ReadableByteChannel) ioStreams.in;
}
// in and out will be closed when sock is closed (by the caller)
final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
NetUtils.getOutputStream(sock,
HdfsServerConstants.WRITE_TIMEOUT)));
ioStreams.out));
new Sender(out).readBlock(block, blockToken, clientName, startOffset, len);
//
// Get bytes in block, set streams
// Get bytes in block
//
SocketInputWrapper sin = NetUtils.getInputStream(sock);
ReadableByteChannel ch = sin.getReadableByteChannel();
DataInputStream in = new DataInputStream(sin);
DataInputStream in = new DataInputStream(ioStreams.in);
BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
vintPrefixed(in));
@ -474,7 +488,8 @@ public class RemoteBlockReader2 implements BlockReader {
}
return new RemoteBlockReader2(file, block.getBlockPoolId(), block.getBlockId(),
ch, checksum, verifyChecksum, startOffset, firstChunkOffset, len, sock);
ch, checksum, verifyChecksum, startOffset, firstChunkOffset, len, sock,
ioStreams);
}
static void checkSuccess(
@ -498,4 +513,9 @@ public class RemoteBlockReader2 implements BlockReader {
}
}
}
@Override
public IOStreamPair getStreams() {
return ioStreams;
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs;
import java.io.Closeable;
import java.net.Socket;
import java.net.SocketAddress;
@ -29,6 +30,8 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.LinkedListMultimap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.io.IOUtils;
/**
@ -37,7 +40,7 @@ import org.apache.hadoop.io.IOUtils;
class SocketCache {
static final Log LOG = LogFactory.getLog(SocketCache.class);
private final LinkedListMultimap<SocketAddress, Socket> multimap;
private final LinkedListMultimap<SocketAddress, SocketAndStreams> multimap;
private final int capacity;
/**
@ -57,21 +60,21 @@ class SocketCache {
* @param remote Remote address the socket is connected to.
* @return A socket with unknown state, possibly closed underneath. Or null.
*/
public synchronized Socket get(SocketAddress remote) {
public synchronized SocketAndStreams get(SocketAddress remote) {
if (capacity <= 0) { // disabled
return null;
}
List<Socket> socklist = multimap.get(remote);
List<SocketAndStreams> socklist = multimap.get(remote);
if (socklist == null) {
return null;
}
Iterator<Socket> iter = socklist.iterator();
Iterator<SocketAndStreams> iter = socklist.iterator();
while (iter.hasNext()) {
Socket candidate = iter.next();
SocketAndStreams candidate = iter.next();
iter.remove();
if (!candidate.isClosed()) {
if (!candidate.sock.isClosed()) {
return candidate;
}
}
@ -82,10 +85,11 @@ class SocketCache {
* Give an unused socket to the cache.
* @param sock socket not used by anyone.
*/
public synchronized void put(Socket sock) {
public synchronized void put(Socket sock, IOStreamPair ioStreams) {
SocketAndStreams s = new SocketAndStreams(sock, ioStreams);
if (capacity <= 0) {
// Cache disabled.
IOUtils.closeSocket(sock);
s.close();
return;
}
@ -102,7 +106,7 @@ class SocketCache {
if (capacity == multimap.size()) {
evictOldest();
}
multimap.put(remoteAddr, sock);
multimap.put(remoteAddr, new SocketAndStreams(sock, ioStreams));
}
public synchronized int size() {
@ -113,23 +117,23 @@ class SocketCache {
* Evict the oldest entry in the cache.
*/
private synchronized void evictOldest() {
Iterator<Entry<SocketAddress, Socket>> iter =
Iterator<Entry<SocketAddress, SocketAndStreams>> iter =
multimap.entries().iterator();
if (!iter.hasNext()) {
throw new IllegalStateException("Cannot evict from empty cache!");
}
Entry<SocketAddress, Socket> entry = iter.next();
Entry<SocketAddress, SocketAndStreams> entry = iter.next();
iter.remove();
Socket sock = entry.getValue();
IOUtils.closeSocket(sock);
SocketAndStreams s = entry.getValue();
s.close();
}
/**
* Empty the cache, and close all sockets.
*/
public synchronized void clear() {
for (Socket sock : multimap.values()) {
IOUtils.closeSocket(sock);
for (SocketAndStreams s : multimap.values()) {
s.close();
}
multimap.clear();
}
@ -138,5 +142,25 @@ class SocketCache {
protected void finalize() {
clear();
}
@InterfaceAudience.Private
static class SocketAndStreams implements Closeable {
public final Socket sock;
public final IOStreamPair ioStreams;
public SocketAndStreams(Socket s, IOStreamPair ioStreams) {
this.sock = s;
this.ioStreams = ioStreams;
}
@Override
public void close() {
if (ioStreams != null) {
IOUtils.closeStream(ioStreams.in);
IOUtils.closeStream(ioStreams.out);
}
IOUtils.closeSocket(sock);
}
}
}

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.KerberosInfo;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenInfo;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
@ -941,4 +942,11 @@ public interface ClientProtocol {
*/
public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
throws IOException;
/**
* @return encryption key so a client can encrypt data sent via the
* DataTransferProtocol to/from DataNodes.
* @throws IOException
*/
public DataEncryptionKey getDataEncryptionKey() throws IOException;
}

View File

@ -0,0 +1,505 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocol.datatransfer;
import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Map;
import java.util.TreeMap;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.callback.NameCallback;
import javax.security.auth.callback.PasswordCallback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.sasl.AuthorizeCallback;
import javax.security.sasl.RealmCallback;
import javax.security.sasl.RealmChoiceCallback;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.security.SaslInputStream;
import org.apache.hadoop.security.SaslOutputStream;
import com.google.common.collect.Maps;
import com.google.protobuf.ByteString;
/**
* A class which, given connected input/output streams, will perform a
* handshake using those streams based on SASL to produce new Input/Output
* streams which will encrypt/decrypt all data written/read from said streams.
* Much of this is inspired by or borrowed from the TSaslTransport in Apache
* Thrift, but with some HDFS-specific tweaks.
*/
@InterfaceAudience.Private
public class DataTransferEncryptor {
public static final Log LOG = LogFactory.getLog(DataTransferEncryptor.class);
/**
* Sent by clients and validated by servers. We use a number that's unlikely
* to ever be sent as the value of the DATA_TRANSFER_VERSION.
*/
private static final int ENCRYPTED_TRANSFER_MAGIC_NUMBER = 0xDEADBEEF;
/**
* Delimiter for the three-part SASL username string.
*/
private static final String NAME_DELIMITER = " ";
// This has to be set as part of the SASL spec, but it don't matter for
// our purposes, but may not be empty. It's sent over the wire, so use
// a short string.
private static final String SERVER_NAME = "0";
private static final String PROTOCOL = "hdfs";
private static final String MECHANISM = "DIGEST-MD5";
private static final Map<String, String> SASL_PROPS = new TreeMap<String, String>();
static {
SASL_PROPS.put(Sasl.QOP, "auth-conf");
SASL_PROPS.put(Sasl.SERVER_AUTH, "true");
}
/**
* Factory method for DNs, where the nonce, keyId, and encryption key are not
* yet known. The nonce and keyId will be sent by the client, and the DN
* will then use those pieces of info and the secret key shared with the NN
* to determine the encryptionKey used for the SASL handshake/encryption.
*
* Establishes a secure connection assuming that the party on the other end
* has the same shared secret. This does a SASL connection handshake, but not
* a general-purpose one. It's specific to the MD5-DIGEST SASL mechanism with
* auth-conf enabled. In particular, it doesn't support an arbitrary number of
* challenge/response rounds, and we know that the client will never have an
* initial response, so we don't check for one.
*
* @param underlyingOut output stream to write to the other party
* @param underlyingIn input stream to read from the other party
* @param blockPoolTokenSecretManager secret manager capable of constructing
* encryption key based on keyId, blockPoolId, and nonce
* @return a pair of streams which wrap the given streams and encrypt/decrypt
* all data read/written
* @throws IOException in the event of error
*/
public static IOStreamPair getEncryptedStreams(
OutputStream underlyingOut, InputStream underlyingIn,
BlockPoolTokenSecretManager blockPoolTokenSecretManager,
String encryptionAlgorithm) throws IOException {
DataInputStream in = new DataInputStream(underlyingIn);
DataOutputStream out = new DataOutputStream(underlyingOut);
Map<String, String> saslProps = Maps.newHashMap(SASL_PROPS);
saslProps.put("com.sun.security.sasl.digest.cipher", encryptionAlgorithm);
if (LOG.isDebugEnabled()) {
LOG.debug("Server using encryption algorithm " + encryptionAlgorithm);
}
SaslParticipant sasl = new SaslParticipant(Sasl.createSaslServer(MECHANISM,
PROTOCOL, SERVER_NAME, saslProps,
new SaslServerCallbackHandler(blockPoolTokenSecretManager)));
int magicNumber = in.readInt();
if (magicNumber != ENCRYPTED_TRANSFER_MAGIC_NUMBER) {
throw new InvalidMagicNumberException(magicNumber);
}
try {
// step 1
performSaslStep1(out, in, sasl);
// step 2 (server-side only)
byte[] remoteResponse = readSaslMessage(in);
byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
sendSaslMessage(out, localResponse);
// SASL handshake is complete
checkSaslComplete(sasl);
return sasl.createEncryptedStreamPair(out, in);
} catch (IOException ioe) {
if (ioe instanceof SaslException &&
ioe.getCause() != null &&
ioe.getCause() instanceof InvalidEncryptionKeyException) {
// This could just be because the client is long-lived and hasn't gotten
// a new encryption key from the NN in a while. Upon receiving this
// error, the client will get a new encryption key from the NN and retry
// connecting to this DN.
sendInvalidKeySaslErrorMessage(out, ioe.getCause().getMessage());
} else {
sendGenericSaslErrorMessage(out, ioe.getMessage());
}
throw ioe;
}
}
/**
* Factory method for clients, where the encryption token is already created.
*
* Establishes a secure connection assuming that the party on the other end
* has the same shared secret. This does a SASL connection handshake, but not
* a general-purpose one. It's specific to the MD5-DIGEST SASL mechanism with
* auth-conf enabled. In particular, it doesn't support an arbitrary number of
* challenge/response rounds, and we know that the client will never have an
* initial response, so we don't check for one.
*
* @param underlyingOut output stream to write to the other party
* @param underlyingIn input stream to read from the other party
* @param encryptionKey all info required to establish an encrypted stream
* @return a pair of streams which wrap the given streams and encrypt/decrypt
* all data read/written
* @throws IOException in the event of error
*/
public static IOStreamPair getEncryptedStreams(
OutputStream underlyingOut, InputStream underlyingIn,
DataEncryptionKey encryptionKey)
throws IOException {
Map<String, String> saslProps = Maps.newHashMap(SASL_PROPS);
saslProps.put("com.sun.security.sasl.digest.cipher",
encryptionKey.encryptionAlgorithm);
if (LOG.isDebugEnabled()) {
LOG.debug("Client using encryption algorithm " +
encryptionKey.encryptionAlgorithm);
}
DataOutputStream out = new DataOutputStream(underlyingOut);
DataInputStream in = new DataInputStream(underlyingIn);
String userName = getUserNameFromEncryptionKey(encryptionKey);
SaslParticipant sasl = new SaslParticipant(Sasl.createSaslClient(
new String[] { MECHANISM }, userName, PROTOCOL, SERVER_NAME, saslProps,
new SaslClientCallbackHandler(encryptionKey.encryptionKey, userName)));
out.writeInt(ENCRYPTED_TRANSFER_MAGIC_NUMBER);
out.flush();
try {
// Start of handshake - "initial response" in SASL terminology.
sendSaslMessage(out, new byte[0]);
// step 1
performSaslStep1(out, in, sasl);
// step 2 (client-side only)
byte[] remoteResponse = readSaslMessage(in);
byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
assert localResponse == null;
// SASL handshake is complete
checkSaslComplete(sasl);
return sasl.createEncryptedStreamPair(out, in);
} catch (IOException ioe) {
sendGenericSaslErrorMessage(out, ioe.getMessage());
throw ioe;
}
}
private static void performSaslStep1(DataOutputStream out, DataInputStream in,
SaslParticipant sasl) throws IOException {
byte[] remoteResponse = readSaslMessage(in);
byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
sendSaslMessage(out, localResponse);
}
private static void checkSaslComplete(SaslParticipant sasl) throws IOException {
if (!sasl.isComplete()) {
throw new IOException("Failed to complete SASL handshake");
}
if (!sasl.supportsConfidentiality()) {
throw new IOException("SASL handshake completed, but channel does not " +
"support encryption");
}
}
private static void sendSaslMessage(DataOutputStream out, byte[] payload)
throws IOException {
sendSaslMessage(out, DataTransferEncryptorStatus.SUCCESS, payload, null);
}
private static void sendInvalidKeySaslErrorMessage(DataOutputStream out,
String message) throws IOException {
sendSaslMessage(out, DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY, null,
message);
}
private static void sendGenericSaslErrorMessage(DataOutputStream out,
String message) throws IOException {
sendSaslMessage(out, DataTransferEncryptorStatus.ERROR, null, message);
}
private static void sendSaslMessage(OutputStream out,
DataTransferEncryptorStatus status, byte[] payload, String message)
throws IOException {
DataTransferEncryptorMessageProto.Builder builder =
DataTransferEncryptorMessageProto.newBuilder();
builder.setStatus(status);
if (payload != null) {
builder.setPayload(ByteString.copyFrom(payload));
}
if (message != null) {
builder.setMessage(message);
}
DataTransferEncryptorMessageProto proto = builder.build();
proto.writeDelimitedTo(out);
out.flush();
}
private static byte[] readSaslMessage(DataInputStream in) throws IOException {
DataTransferEncryptorMessageProto proto =
DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in));
if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) {
throw new InvalidEncryptionKeyException(proto.getMessage());
} else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) {
throw new IOException(proto.getMessage());
} else {
return proto.getPayload().toByteArray();
}
}
/**
* Set the encryption key when asked by the server-side SASL object.
*/
private static class SaslServerCallbackHandler implements CallbackHandler {
private BlockPoolTokenSecretManager blockPoolTokenSecretManager;
public SaslServerCallbackHandler(BlockPoolTokenSecretManager
blockPoolTokenSecretManager) {
this.blockPoolTokenSecretManager = blockPoolTokenSecretManager;
}
@Override
public void handle(Callback[] callbacks) throws IOException,
UnsupportedCallbackException {
NameCallback nc = null;
PasswordCallback pc = null;
AuthorizeCallback ac = null;
for (Callback callback : callbacks) {
if (callback instanceof AuthorizeCallback) {
ac = (AuthorizeCallback) callback;
} else if (callback instanceof PasswordCallback) {
pc = (PasswordCallback) callback;
} else if (callback instanceof NameCallback) {
nc = (NameCallback) callback;
} else if (callback instanceof RealmCallback) {
continue; // realm is ignored
} else {
throw new UnsupportedCallbackException(callback,
"Unrecognized SASL DIGEST-MD5 Callback: " + callback);
}
}
if (pc != null) {
byte[] encryptionKey = getEncryptionKeyFromUserName(
blockPoolTokenSecretManager, nc.getDefaultName());
pc.setPassword(encryptionKeyToPassword(encryptionKey));
}
if (ac != null) {
ac.setAuthorized(true);
ac.setAuthorizedID(ac.getAuthorizationID());
}
}
}
/**
* Set the encryption key when asked by the client-side SASL object.
*/
private static class SaslClientCallbackHandler implements CallbackHandler {
private byte[] encryptionKey;
private String userName;
public SaslClientCallbackHandler(byte[] encryptionKey, String userName) {
this.encryptionKey = encryptionKey;
this.userName = userName;
}
@Override
public void handle(Callback[] callbacks) throws IOException,
UnsupportedCallbackException {
NameCallback nc = null;
PasswordCallback pc = null;
RealmCallback rc = null;
for (Callback callback : callbacks) {
if (callback instanceof RealmChoiceCallback) {
continue;
} else if (callback instanceof NameCallback) {
nc = (NameCallback) callback;
} else if (callback instanceof PasswordCallback) {
pc = (PasswordCallback) callback;
} else if (callback instanceof RealmCallback) {
rc = (RealmCallback) callback;
} else {
throw new UnsupportedCallbackException(callback,
"Unrecognized SASL client callback");
}
}
if (nc != null) {
nc.setName(userName);
}
if (pc != null) {
pc.setPassword(encryptionKeyToPassword(encryptionKey));
}
if (rc != null) {
rc.setText(rc.getDefaultText());
}
}
}
/**
* The SASL username consists of the keyId, blockPoolId, and nonce with the
* first two encoded as Strings, and the third encoded using Base64. The
* fields are each separated by a single space.
*
* @param encryptionKey the encryption key to encode as a SASL username.
* @return encoded username containing keyId, blockPoolId, and nonce
*/
private static String getUserNameFromEncryptionKey(
DataEncryptionKey encryptionKey) {
return encryptionKey.keyId + NAME_DELIMITER +
encryptionKey.blockPoolId + NAME_DELIMITER +
new String(Base64.encodeBase64(encryptionKey.nonce, false));
}
/**
* Given a secret manager and a username encoded as described above, determine
* the encryption key.
*
* @param blockPoolTokenSecretManager to determine the encryption key.
* @param userName containing the keyId, blockPoolId, and nonce.
* @return secret encryption key.
* @throws IOException
*/
private static byte[] getEncryptionKeyFromUserName(
BlockPoolTokenSecretManager blockPoolTokenSecretManager, String userName)
throws IOException {
String[] nameComponents = userName.split(NAME_DELIMITER);
if (nameComponents.length != 3) {
throw new IOException("Provided name '" + userName + "' has " +
nameComponents.length + " components instead of the expected 3.");
}
int keyId = Integer.parseInt(nameComponents[0]);
String blockPoolId = nameComponents[1];
byte[] nonce = Base64.decodeBase64(nameComponents[2]);
return blockPoolTokenSecretManager.retrieveDataEncryptionKey(keyId,
blockPoolId, nonce);
}
private static char[] encryptionKeyToPassword(byte[] encryptionKey) {
return new String(Base64.encodeBase64(encryptionKey, false)).toCharArray();
}
/**
* Strongly inspired by Thrift's TSaslTransport class.
*
* Used to abstract over the <code>SaslServer</code> and
* <code>SaslClient</code> classes, which share a lot of their interface, but
* unfortunately don't share a common superclass.
*/
private static class SaslParticipant {
// One of these will always be null.
public SaslServer saslServer;
public SaslClient saslClient;
public SaslParticipant(SaslServer saslServer) {
this.saslServer = saslServer;
}
public SaslParticipant(SaslClient saslClient) {
this.saslClient = saslClient;
}
public byte[] evaluateChallengeOrResponse(byte[] challengeOrResponse) throws SaslException {
if (saslClient != null) {
return saslClient.evaluateChallenge(challengeOrResponse);
} else {
return saslServer.evaluateResponse(challengeOrResponse);
}
}
public boolean isComplete() {
if (saslClient != null)
return saslClient.isComplete();
else
return saslServer.isComplete();
}
public boolean supportsConfidentiality() {
String qop = null;
if (saslClient != null) {
qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP);
} else {
qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
}
return qop != null && qop.equals("auth-conf");
}
// Return some input/output streams that will henceforth have their
// communication encrypted.
private IOStreamPair createEncryptedStreamPair(
DataOutputStream out, DataInputStream in) {
if (saslClient != null) {
return new IOStreamPair(
new SaslInputStream(in, saslClient),
new SaslOutputStream(out, saslClient));
} else {
return new IOStreamPair(
new SaslInputStream(in, saslServer),
new SaslOutputStream(out, saslServer));
}
}
}
@InterfaceAudience.Private
public static class InvalidMagicNumberException extends IOException {
private static final long serialVersionUID = 1L;
public InvalidMagicNumberException(int magicNumber) {
super(String.format("Received %x instead of %x from client.",
magicNumber, ENCRYPTED_TRANSFER_MAGIC_NUMBER));
}
}
}

View File

@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocol.datatransfer;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* A little struct class to wrap an InputStream and an OutputStream.
*/
@InterfaceAudience.Private
public class IOStreamPair {
public final InputStream in;
public final OutputStream out;
public IOStreamPair(InputStream in, OutputStream out) {
this.in = in;
this.out = out;
}
}

View File

@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocol.datatransfer;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Encryption key verification failed.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class InvalidEncryptionKeyException extends IOException {
private static final long serialVersionUID = 0l;
public InvalidEncryptionKeyException() {
super();
}
public InvalidEncryptionKeyException(String msg) {
super(msg);
}
}

View File

@ -38,10 +38,10 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProt
@InterfaceAudience.Private
@InterfaceStability.Evolving
public abstract class Receiver implements DataTransferProtocol {
protected final DataInputStream in;
/** Create a receiver for DataTransferProtocol with a socket. */
protected Receiver(final DataInputStream in) {
protected DataInputStream in;
/** Initialize a receiver for DataTransferProtocol with a socket. */
protected void initialize(final DataInputStream in) {
this.in = in;
}

View File

@ -58,6 +58,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FsyncR
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FsyncResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdditionalDatanodeRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdditionalDatanodeResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto.Builder;
@ -127,6 +129,7 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockTokenIdentifierProt
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
import org.apache.hadoop.io.Text;
@ -830,4 +833,18 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
throw new ServiceException(e);
}
}
@Override
public GetDataEncryptionKeyResponseProto getDataEncryptionKey(
RpcController controller, GetDataEncryptionKeyRequestProto request)
throws ServiceException {
try {
DataEncryptionKey encryptionKey = server.getDataEncryptionKey();
return GetDataEncryptionKeyResponseProto.newBuilder()
.setDataEncryptionKey(PBHelper.convert(encryptionKey))
.build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
}

View File

@ -63,6 +63,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Distri
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FinalizeUpgradeRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FsyncRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdditionalDatanodeRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetContentSummaryRequestProto;
@ -99,6 +100,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetSaf
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTimesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
@ -815,9 +817,22 @@ public class ClientNamenodeProtocolTranslatorPB implements
ClientNamenodeProtocolPB.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER,
RPC.getProtocolVersion(ClientNamenodeProtocolPB.class), methodName);
}
@Override
public DataEncryptionKey getDataEncryptionKey() throws IOException {
GetDataEncryptionKeyRequestProto req = GetDataEncryptionKeyRequestProto
.newBuilder().build();
try {
return PBHelper.convert(rpcProxy.getDataEncryptionKey(null, req)
.getDataEncryptionKey());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public Object getUnderlyingProxyObject() {
return rpcProxy;
}
}

View File

@ -62,6 +62,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReceivedDele
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReportProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.UpgradeCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DataEncryptionKeyProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockTokenIdentifierProto;
@ -96,6 +97,7 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaStateProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.UpgradeStatusReportProto;
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalInfoProto;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.BlockKey;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
@ -970,12 +972,37 @@ public class PBHelper {
.setIsLastBlockComplete(lb.isLastBlockComplete()).build();
}
// DataEncryptionKey
public static DataEncryptionKey convert(DataEncryptionKeyProto bet) {
String encryptionAlgorithm = bet.getEncryptionAlgorithm();
return new DataEncryptionKey(bet.getKeyId(),
bet.getBlockPoolId(),
bet.getNonce().toByteArray(),
bet.getEncryptionKey().toByteArray(),
bet.getExpiryDate(),
encryptionAlgorithm.isEmpty() ? null : encryptionAlgorithm);
}
public static DataEncryptionKeyProto convert(DataEncryptionKey bet) {
DataEncryptionKeyProto.Builder b = DataEncryptionKeyProto.newBuilder()
.setKeyId(bet.keyId)
.setBlockPoolId(bet.blockPoolId)
.setNonce(ByteString.copyFrom(bet.nonce))
.setEncryptionKey(ByteString.copyFrom(bet.encryptionKey))
.setExpiryDate(bet.expiryDate);
if (bet.encryptionAlgorithm != null) {
b.setEncryptionAlgorithm(bet.encryptionAlgorithm);
}
return b.build();
}
public static FsServerDefaults convert(FsServerDefaultsProto fs) {
if (fs == null) return null;
return new FsServerDefaults(
fs.getBlockSize(), fs.getBytesPerChecksum(),
fs.getWritePacketSize(), (short) fs.getReplication(),
fs.getFileBufferSize());
fs.getFileBufferSize(),
fs.getEncryptDataTransfer());
}
public static FsServerDefaultsProto convert(FsServerDefaults fs) {
@ -983,7 +1010,10 @@ public class PBHelper {
return FsServerDefaultsProto.newBuilder().
setBlockSize(fs.getBlockSize()).
setBytesPerChecksum(fs.getBytesPerChecksum()).
setWritePacketSize(fs.getWritePacketSize()).setReplication(fs.getReplication()).setFileBufferSize(fs.getFileBufferSize()).build();
setWritePacketSize(fs.getWritePacketSize())
.setReplication(fs.getReplication())
.setFileBufferSize(fs.getFileBufferSize())
.setEncryptDataTransfer(fs.getEncryptDataTransfer()).build();
}
public static FsPermissionProto convert(FsPermission p) {

View File

@ -119,4 +119,13 @@ public class BlockPoolTokenSecretManager extends
btsm.clearAllKeysForTesting();
}
}
public DataEncryptionKey generateDataEncryptionKey(String blockPoolId) {
return get(blockPoolId).generateDataEncryptionKey();
}
public byte[] retrieveDataEncryptionKey(int keyId, String blockPoolId,
byte[] nonce) throws IOException {
return get(blockPoolId).retrieveDataEncryptionKey(keyId, nonce);
}
}

View File

@ -32,6 +32,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
@ -74,6 +75,10 @@ public class BlockTokenSecretManager extends
private BlockKey currentKey;
private BlockKey nextKey;
private Map<Integer, BlockKey> allKeys;
private String blockPoolId;
private String encryptionAlgorithm;
private SecureRandom nonceGenerator = new SecureRandom();
public static enum AccessMode {
READ, WRITE, COPY, REPLACE
@ -86,8 +91,9 @@ public class BlockTokenSecretManager extends
* @param tokenLifetime how long an individual token is valid
*/
public BlockTokenSecretManager(long keyUpdateInterval,
long tokenLifetime) {
this(false, keyUpdateInterval, tokenLifetime);
long tokenLifetime, String blockPoolId, String encryptionAlgorithm) {
this(false, keyUpdateInterval, tokenLifetime, blockPoolId,
encryptionAlgorithm);
}
/**
@ -100,8 +106,10 @@ public class BlockTokenSecretManager extends
* @param otherNnId the NN ID of the other NN in an HA setup
*/
public BlockTokenSecretManager(long keyUpdateInterval,
long tokenLifetime, int nnIndex) {
this(true, keyUpdateInterval, tokenLifetime);
long tokenLifetime, int nnIndex, String blockPoolId,
String encryptionAlgorithm) {
this(true, keyUpdateInterval, tokenLifetime, blockPoolId,
encryptionAlgorithm);
Preconditions.checkArgument(nnIndex == 0 || nnIndex == 1);
this.nnIndex = nnIndex;
setSerialNo(new SecureRandom().nextInt());
@ -109,17 +117,24 @@ public class BlockTokenSecretManager extends
}
private BlockTokenSecretManager(boolean isMaster, long keyUpdateInterval,
long tokenLifetime) {
long tokenLifetime, String blockPoolId, String encryptionAlgorithm) {
this.isMaster = isMaster;
this.keyUpdateInterval = keyUpdateInterval;
this.tokenLifetime = tokenLifetime;
this.allKeys = new HashMap<Integer, BlockKey>();
this.blockPoolId = blockPoolId;
this.encryptionAlgorithm = encryptionAlgorithm;
generateKeys();
}
@VisibleForTesting
public synchronized void setSerialNo(int serialNo) {
this.serialNo = (serialNo & LOW_MASK) | (nnIndex << 31);
}
public void setBlockPoolId(String blockPoolId) {
this.blockPoolId = blockPoolId;
}
/** Initialize block keys */
private synchronized void generateKeys() {
@ -371,6 +386,49 @@ public class BlockTokenSecretManager extends
return createPassword(identifier.getBytes(), key.getKey());
}
/**
* Generate a data encryption key for this block pool, using the current
* BlockKey.
*
* @return a data encryption key which may be used to encrypt traffic
* over the DataTransferProtocol
*/
public DataEncryptionKey generateDataEncryptionKey() {
byte[] nonce = new byte[8];
nonceGenerator.nextBytes(nonce);
BlockKey key = null;
synchronized (this) {
key = currentKey;
}
byte[] encryptionKey = createPassword(nonce, key.getKey());
return new DataEncryptionKey(key.getKeyId(), blockPoolId, nonce,
encryptionKey, Time.now() + tokenLifetime,
encryptionAlgorithm);
}
/**
* Recreate an encryption key based on the given key id and nonce.
*
* @param keyId identifier of the secret key used to generate the encryption key.
* @param nonce random value used to create the encryption key
* @return the encryption key which corresponds to this (keyId, blockPoolId, nonce)
* @throws InvalidToken
* @throws InvalidEncryptionKeyException
*/
public byte[] retrieveDataEncryptionKey(int keyId, byte[] nonce)
throws InvalidEncryptionKeyException {
BlockKey key = null;
synchronized (this) {
key = allKeys.get(keyId);
if (key == null) {
throw new InvalidEncryptionKeyException("Can't re-compute encryption key"
+ " for nonce, since the required block key (keyID=" + keyId
+ ") doesn't exist. Current key: " + currentKey.getKeyId());
}
}
return createPassword(nonce, key.getKey());
}
@VisibleForTesting
public synchronized void setKeyUpdateIntervalForTesting(long millis) {
this.keyUpdateInterval = millis;

View File

@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.security.token.block;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* A little struct class to contain all fields required to perform encryption of
* the DataTransferProtocol.
*/
@InterfaceAudience.Private
public class DataEncryptionKey {
public final int keyId;
public final String blockPoolId;
public final byte[] nonce;
public final byte[] encryptionKey;
public final long expiryDate;
public final String encryptionAlgorithm;
public DataEncryptionKey(int keyId, String blockPoolId, byte[] nonce,
byte[] encryptionKey, long expiryDate, String encryptionAlgorithm) {
this.keyId = keyId;
this.blockPoolId = blockPoolId;
this.nonce = nonce;
this.encryptionKey = encryptionKey;
this.expiryDate = expiryDate;
this.encryptionAlgorithm = encryptionAlgorithm;
}
@Override
public String toString() {
return keyId + "/" + blockPoolId + "/" + nonce.length + "/" +
encryptionKey.length;
}
}

View File

@ -24,6 +24,8 @@ import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.net.URI;
import java.text.DateFormat;
@ -57,6 +59,8 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
@ -311,11 +315,22 @@ public class Balancer {
NetUtils.createSocketAddr(target.datanode.getXferAddr()),
HdfsServerConstants.READ_TIMEOUT);
sock.setKeepAlive(true);
out = new DataOutputStream( new BufferedOutputStream(
sock.getOutputStream(), HdfsConstants.IO_FILE_BUFFER_SIZE));
OutputStream unbufOut = sock.getOutputStream();
InputStream unbufIn = sock.getInputStream();
if (nnc.getDataEncryptionKey() != null) {
IOStreamPair encryptedStreams =
DataTransferEncryptor.getEncryptedStreams(
unbufOut, unbufIn, nnc.getDataEncryptionKey());
unbufOut = encryptedStreams.out;
unbufIn = encryptedStreams.in;
}
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
HdfsConstants.IO_FILE_BUFFER_SIZE));
in = new DataInputStream(new BufferedInputStream(unbufIn,
HdfsConstants.IO_FILE_BUFFER_SIZE));
sendRequest(out);
in = new DataInputStream( new BufferedInputStream(
sock.getInputStream(), HdfsConstants.IO_FILE_BUFFER_SIZE));
receiveResponse(in);
bytesMoved.inc(block.getNumBytes());
LOG.info( "Moving block " + block.getBlock().getBlockId() +

View File

@ -29,10 +29,12 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
@ -60,10 +62,12 @@ class NameNodeConnector {
final OutputStream out;
private final boolean isBlockTokenEnabled;
private final boolean encryptDataTransfer;
private boolean shouldRun;
private long keyUpdaterInterval;
private BlockTokenSecretManager blockTokenSecretManager;
private Daemon keyupdaterthread; // AccessKeyUpdater thread
private DataEncryptionKey encryptionKey;
NameNodeConnector(URI nameNodeUri,
Configuration conf) throws IOException {
@ -88,8 +92,11 @@ class NameNodeConnector {
LOG.info("Block token params received from NN: keyUpdateInterval="
+ blockKeyUpdateInterval / (60 * 1000) + " min(s), tokenLifetime="
+ blockTokenLifetime / (60 * 1000) + " min(s)");
String encryptionAlgorithm = conf.get(
DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
this.blockTokenSecretManager = new BlockTokenSecretManager(
blockKeyUpdateInterval, blockTokenLifetime);
blockKeyUpdateInterval, blockTokenLifetime, blockpoolID,
encryptionAlgorithm);
this.blockTokenSecretManager.addKeys(keys);
/*
* Balancer should sync its block keys with NN more frequently than NN
@ -102,7 +109,8 @@ class NameNodeConnector {
this.shouldRun = true;
this.keyupdaterthread.start();
}
this.encryptDataTransfer = fs.getServerDefaults(new Path("/"))
.getEncryptDataTransfer();
// Check if there is another balancer running.
// Exit if there is another one running.
out = checkAndMarkRunningBalancer();
@ -126,6 +134,20 @@ class NameNodeConnector {
BlockTokenSecretManager.AccessMode.COPY));
}
}
DataEncryptionKey getDataEncryptionKey()
throws IOException {
if (encryptDataTransfer) {
synchronized (this) {
if (encryptionKey == null) {
encryptionKey = blockTokenSecretManager.generateDataEncryptionKey();
}
return encryptionKey;
}
} else {
return null;
}
}
/* The idea for making sure that there is no more than one balancer
* running in an HDFS is to create a file in the HDFS, writes the IP address
@ -208,4 +230,4 @@ class NameNodeConnector {
}
}
}
}
}

View File

@ -50,6 +50,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
@ -206,6 +207,9 @@ public class BlockManager {
/** variable to enable check for enough racks */
final boolean shouldCheckForEnoughRacks;
// whether or not to issue block encryption keys.
final boolean encryptDataTransfer;
/**
* When running inside a Standby node, the node may receive block reports
@ -286,12 +290,18 @@ public class BlockManager {
this.replicationRecheckInterval =
conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000L;
this.encryptDataTransfer =
conf.getBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY,
DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT);
LOG.info("defaultReplication = " + defaultReplication);
LOG.info("maxReplication = " + maxReplication);
LOG.info("minReplication = " + minReplication);
LOG.info("maxReplicationStreams = " + maxReplicationStreams);
LOG.info("shouldCheckForEnoughRacks = " + shouldCheckForEnoughRacks);
LOG.info("replicationRecheckInterval = " + replicationRecheckInterval);
LOG.info("encryptDataTransfer = " + encryptDataTransfer);
}
private static BlockTokenSecretManager createBlockTokenSecretManager(
@ -311,10 +321,14 @@ public class BlockManager {
final long lifetimeMin = conf.getLong(
DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_LIFETIME_KEY,
DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_LIFETIME_DEFAULT);
final String encryptionAlgorithm = conf.get(
DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
LOG.info(DFSConfigKeys.DFS_BLOCK_ACCESS_KEY_UPDATE_INTERVAL_KEY
+ "=" + updateMin + " min(s), "
+ DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_LIFETIME_KEY
+ "=" + lifetimeMin + " min(s)");
+ "=" + lifetimeMin + " min(s), "
+ DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY
+ "=" + encryptionAlgorithm);
String nsId = DFSUtil.getNamenodeNameServiceId(conf);
boolean isHaEnabled = HAUtil.isHAEnabled(conf, nsId);
@ -323,10 +337,17 @@ public class BlockManager {
String thisNnId = HAUtil.getNameNodeId(conf, nsId);
String otherNnId = HAUtil.getNameNodeIdOfOtherNode(conf, nsId);
return new BlockTokenSecretManager(updateMin*60*1000L,
lifetimeMin*60*1000L, thisNnId.compareTo(otherNnId) < 0 ? 0 : 1);
lifetimeMin*60*1000L, thisNnId.compareTo(otherNnId) < 0 ? 0 : 1, null,
encryptionAlgorithm);
} else {
return new BlockTokenSecretManager(updateMin*60*1000L,
lifetimeMin*60*1000L, 0);
lifetimeMin*60*1000L, 0, null, encryptionAlgorithm);
}
}
public void setBlockPoolId(String blockPoolId) {
if (isBlockTokenEnabled()) {
blockTokenSecretManager.setBlockPoolId(blockPoolId);
}
}
@ -793,6 +814,14 @@ public class BlockManager {
nodeinfo.needKeyUpdate = false;
}
}
public DataEncryptionKey generateDataEncryptionKey() {
if (isBlockTokenEnabled() && encryptDataTransfer) {
return blockTokenSecretManager.generateDataEncryptionKey();
} else {
return null;
}
}
/**
* Clamp the specified replication between the minimum and the maximum

View File

@ -50,6 +50,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
@ -195,7 +196,8 @@ public class JspHelper {
public static void streamBlockInAscii(InetSocketAddress addr, String poolId,
long blockId, Token<BlockTokenIdentifier> blockToken, long genStamp,
long blockSize, long offsetIntoBlock, long chunkSizeToView,
JspWriter out, Configuration conf) throws IOException {
JspWriter out, Configuration conf, DataEncryptionKey encryptionKey)
throws IOException {
if (chunkSizeToView == 0) return;
Socket s = NetUtils.getDefaultSocketFactory(conf).createSocket();
s.connect(addr, HdfsServerConstants.READ_TIMEOUT);
@ -208,7 +210,7 @@ public class JspHelper {
BlockReader blockReader = BlockReaderFactory.newBlockReader(
conf, s, file,
new ExtendedBlock(poolId, blockId, 0, genStamp), blockToken,
offsetIntoBlock, amtToRead);
offsetIntoBlock, amtToRead, encryptionKey);
byte[] buf = new byte[(int)amtToRead];
int readOffset = 0;

View File

@ -33,7 +33,9 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAUL
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
@ -52,6 +54,7 @@ class DNConf {
final boolean syncBehindWrites;
final boolean dropCacheBehindReads;
final boolean syncOnClose;
final boolean encryptDataTransfer;
final long readaheadLength;
@ -62,6 +65,7 @@ class DNConf {
final int writePacketSize;
final String minimumNameNodeVersion;
final String encryptionAlgorithm;
public DNConf(Configuration conf) {
socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
@ -117,6 +121,10 @@ class DNConf {
this.minimumNameNodeVersion = conf.get(DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_KEY,
DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_DEFAULT);
this.encryptDataTransfer = conf.getBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY,
DFS_ENCRYPT_DATA_TRANSFER_DEFAULT);
this.encryptionAlgorithm = conf.get(DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
}
// We get minimumNameNodeVersion via a method so it can be mocked out in tests.

View File

@ -53,6 +53,7 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
@ -100,6 +101,8 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ClientDatanodeProtocolService;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DNTransferAckProto;
@ -737,8 +740,6 @@ public class DataNode extends Configured
+ " tokens, or none may be.");
}
}
// TODO should we check that all federated nns are either enabled or
// disabled?
if (!isBlockTokenEnabled) return;
if (!blockPoolTokenSecretManager.isBlockPoolRegistered(blockPoolId)) {
@ -750,7 +751,8 @@ public class DataNode extends Configured
+ " min(s), tokenLifetime=" + blockTokenLifetime / (60 * 1000)
+ " min(s)");
final BlockTokenSecretManager secretMgr =
new BlockTokenSecretManager(0, blockTokenLifetime);
new BlockTokenSecretManager(0, blockTokenLifetime, blockPoolId,
dnConf.encryptionAlgorithm);
blockPoolTokenSecretManager.addBlockPool(blockPoolId, secretMgr);
}
}
@ -1390,9 +1392,21 @@ public class DataNode extends Configured
long writeTimeout = dnConf.socketWriteTimeout +
HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1);
OutputStream baseStream = NetUtils.getOutputStream(sock, writeTimeout);
out = new DataOutputStream(new BufferedOutputStream(baseStream,
OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
InputStream unbufIn = NetUtils.getInputStream(sock);
if (dnConf.encryptDataTransfer) {
IOStreamPair encryptedStreams =
DataTransferEncryptor.getEncryptedStreams(
unbufOut, unbufIn,
blockPoolTokenSecretManager.generateDataEncryptionKey(
b.getBlockPoolId()));
unbufOut = encryptedStreams.out;
unbufIn = encryptedStreams.in;
}
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
HdfsConstants.SMALL_BUFFER_SIZE));
in = new DataInputStream(unbufIn);
blockSender = new BlockSender(b, 0, b.getNumBytes(),
false, false, DataNode.this, null);
DatanodeInfo srcNode = new DatanodeInfo(bpReg);
@ -1410,7 +1424,7 @@ public class DataNode extends Configured
stage, 0, 0, 0, 0, blockSender.getChecksum());
// send data & checksum
blockSender.sendBlock(out, baseStream, null);
blockSender.sendBlock(out, unbufOut, null);
// no response necessary
LOG.info(getClass().getSimpleName() + ": Transmitted " + b
@ -1418,7 +1432,6 @@ public class DataNode extends Configured
// read ack
if (isClient) {
in = new DataInputStream(NetUtils.getInputStream(sock));
DNTransferAckProto closeAck = DNTransferAckProto.parseFrom(
HdfsProtoUtil.vintPrefixed(in));
if (LOG.isDebugEnabled()) {

View File

@ -29,6 +29,7 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
@ -43,7 +44,10 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor.InvalidMagicNumberException;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
import org.apache.hadoop.hdfs.protocol.datatransfer.Receiver;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
@ -84,7 +88,8 @@ class DataXceiver extends Receiver implements Runnable {
private final DataXceiverServer dataXceiverServer;
private long opStartTime; //the start time of receiving an Op
private final SocketInputWrapper socketInputWrapper;
private final SocketInputWrapper socketIn;
private OutputStream socketOut;
/**
* Client Name used in previous operation. Not available on first request
@ -94,23 +99,19 @@ class DataXceiver extends Receiver implements Runnable {
public static DataXceiver create(Socket s, DataNode dn,
DataXceiverServer dataXceiverServer) throws IOException {
SocketInputWrapper iw = NetUtils.getInputStream(s);
return new DataXceiver(s, iw, dn, dataXceiverServer);
return new DataXceiver(s, dn, dataXceiverServer);
}
private DataXceiver(Socket s,
SocketInputWrapper socketInput,
DataNode datanode,
DataXceiverServer dataXceiverServer) throws IOException {
super(new DataInputStream(new BufferedInputStream(
socketInput, HdfsConstants.SMALL_BUFFER_SIZE)));
this.s = s;
this.socketInputWrapper = socketInput;
this.dnConf = datanode.getDnConf();
this.socketIn = NetUtils.getInputStream(s);
this.socketOut = NetUtils.getOutputStream(s, dnConf.socketWriteTimeout);
this.isLocal = s.getInetAddress().equals(s.getLocalAddress());
this.datanode = datanode;
this.dnConf = datanode.getDnConf();
this.dataXceiverServer = dataXceiverServer;
remoteAddress = s.getRemoteSocketAddress().toString();
localAddress = s.getLocalSocketAddress().toString();
@ -141,6 +142,10 @@ class DataXceiver extends Receiver implements Runnable {
/** Return the datanode object. */
DataNode getDataNode() {return datanode;}
private OutputStream getOutputStream() throws IOException {
return socketOut;
}
/**
* Read/write data from/to the DataXceiverServer.
@ -149,8 +154,31 @@ class DataXceiver extends Receiver implements Runnable {
public void run() {
int opsProcessed = 0;
Op op = null;
dataXceiverServer.childSockets.add(s);
try {
InputStream input = socketIn;
if (dnConf.encryptDataTransfer) {
IOStreamPair encryptedStreams = null;
try {
encryptedStreams = DataTransferEncryptor.getEncryptedStreams(socketOut,
socketIn, datanode.blockPoolTokenSecretManager,
dnConf.encryptionAlgorithm);
} catch (InvalidMagicNumberException imne) {
LOG.info("Failed to read expected encryption handshake from client " +
"at " + s.getInetAddress() + ". Perhaps the client is running an " +
"older version of Hadoop which does not support encryption.");
return;
}
input = encryptedStreams.in;
socketOut = encryptedStreams.out;
}
input = new BufferedInputStream(input, HdfsConstants.SMALL_BUFFER_SIZE);
super.initialize(new DataInputStream(input));
// We process requests in a loop, and stay around for a short timeout.
// This optimistic behaviour allows the other end to reuse connections.
// Setting keepalive timeout to 0 disable this behavior.
@ -160,9 +188,9 @@ class DataXceiver extends Receiver implements Runnable {
try {
if (opsProcessed != 0) {
assert dnConf.socketKeepaliveTimeout > 0;
socketInputWrapper.setTimeout(dnConf.socketKeepaliveTimeout);
socketIn.setTimeout(dnConf.socketKeepaliveTimeout);
} else {
socketInputWrapper.setTimeout(dnConf.socketTimeout);
socketIn.setTimeout(dnConf.socketTimeout);
}
op = readOp();
} catch (InterruptedIOException ignored) {
@ -215,8 +243,7 @@ class DataXceiver extends Receiver implements Runnable {
final long length) throws IOException {
previousOpClientName = clientName;
OutputStream baseStream = NetUtils.getOutputStream(s,
dnConf.socketWriteTimeout);
OutputStream baseStream = getOutputStream();
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
baseStream, HdfsConstants.SMALL_BUFFER_SIZE));
checkAccess(out, true, block, blockToken,
@ -242,13 +269,12 @@ class DataXceiver extends Receiver implements Runnable {
} catch(IOException e) {
String msg = "opReadBlock " + block + " received exception " + e;
LOG.info(msg);
sendResponse(s, ERROR, msg, dnConf.socketWriteTimeout);
sendResponse(ERROR, msg);
throw e;
}
// send op status
writeSuccessWithChecksumInfo(blockSender,
getStreamWithTimeout(s, dnConf.socketWriteTimeout));
writeSuccessWithChecksumInfo(blockSender, new DataOutputStream(getOutputStream()));
long read = blockSender.sendBlock(out, baseStream, null); // send data
@ -347,7 +373,7 @@ class DataXceiver extends Receiver implements Runnable {
// reply to upstream datanode or client
final DataOutputStream replyOut = new DataOutputStream(
new BufferedOutputStream(
NetUtils.getOutputStream(s, dnConf.socketWriteTimeout),
getOutputStream(),
HdfsConstants.SMALL_BUFFER_SIZE));
checkAccess(replyOut, isClient, block, blockToken,
Op.WRITE_BLOCK, BlockTokenSecretManager.AccessMode.WRITE);
@ -389,11 +415,23 @@ class DataXceiver extends Receiver implements Runnable {
NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue);
mirrorSock.setSoTimeout(timeoutValue);
mirrorSock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
mirrorOut = new DataOutputStream(
new BufferedOutputStream(
NetUtils.getOutputStream(mirrorSock, writeTimeout),
HdfsConstants.SMALL_BUFFER_SIZE));
mirrorIn = new DataInputStream(NetUtils.getInputStream(mirrorSock));
OutputStream unbufMirrorOut = NetUtils.getOutputStream(mirrorSock,
writeTimeout);
InputStream unbufMirrorIn = NetUtils.getInputStream(mirrorSock);
if (dnConf.encryptDataTransfer) {
IOStreamPair encryptedStreams =
DataTransferEncryptor.getEncryptedStreams(
unbufMirrorOut, unbufMirrorIn,
datanode.blockPoolTokenSecretManager
.generateDataEncryptionKey(block.getBlockPoolId()));
unbufMirrorOut = encryptedStreams.out;
unbufMirrorIn = encryptedStreams.in;
}
mirrorOut = new DataOutputStream(new BufferedOutputStream(unbufMirrorOut,
HdfsConstants.SMALL_BUFFER_SIZE));
mirrorIn = new DataInputStream(unbufMirrorIn);
new Sender(mirrorOut).writeBlock(originalBlock, blockToken,
clientname, targets, srcDataNode, stage, pipelineSize,
@ -520,7 +558,7 @@ class DataXceiver extends Receiver implements Runnable {
updateCurrentThreadName(Op.TRANSFER_BLOCK + " " + blk);
final DataOutputStream out = new DataOutputStream(
NetUtils.getOutputStream(s, dnConf.socketWriteTimeout));
getOutputStream());
try {
datanode.transferReplicaForPipelineRecovery(blk, targets, clientName);
writeResponse(Status.SUCCESS, null, out);
@ -533,7 +571,7 @@ class DataXceiver extends Receiver implements Runnable {
public void blockChecksum(final ExtendedBlock block,
final Token<BlockTokenIdentifier> blockToken) throws IOException {
final DataOutputStream out = new DataOutputStream(
NetUtils.getOutputStream(s, dnConf.socketWriteTimeout));
getOutputStream());
checkAccess(out, true, block, blockToken,
Op.BLOCK_CHECKSUM, BlockTokenSecretManager.AccessMode.READ);
updateCurrentThreadName("Reading metadata for block " + block);
@ -593,7 +631,7 @@ class DataXceiver extends Receiver implements Runnable {
LOG.warn("Invalid access token in request from " + remoteAddress
+ " for OP_COPY_BLOCK for block " + block + " : "
+ e.getLocalizedMessage());
sendResponse(s, ERROR_ACCESS_TOKEN, "Invalid access token", dnConf.socketWriteTimeout);
sendResponse(ERROR_ACCESS_TOKEN, "Invalid access token");
return;
}
@ -603,7 +641,7 @@ class DataXceiver extends Receiver implements Runnable {
String msg = "Not able to copy block " + block.getBlockId() + " to "
+ s.getRemoteSocketAddress() + " because threads quota is exceeded.";
LOG.info(msg);
sendResponse(s, ERROR, msg, dnConf.socketWriteTimeout);
sendResponse(ERROR, msg);
return;
}
@ -617,8 +655,7 @@ class DataXceiver extends Receiver implements Runnable {
null);
// set up response stream
OutputStream baseStream = NetUtils.getOutputStream(
s, dnConf.socketWriteTimeout);
OutputStream baseStream = getOutputStream();
reply = new DataOutputStream(new BufferedOutputStream(
baseStream, HdfsConstants.SMALL_BUFFER_SIZE));
@ -670,8 +707,7 @@ class DataXceiver extends Receiver implements Runnable {
LOG.warn("Invalid access token in request from " + remoteAddress
+ " for OP_REPLACE_BLOCK for block " + block + " : "
+ e.getLocalizedMessage());
sendResponse(s, ERROR_ACCESS_TOKEN, "Invalid access token",
dnConf.socketWriteTimeout);
sendResponse(ERROR_ACCESS_TOKEN, "Invalid access token");
return;
}
}
@ -680,7 +716,7 @@ class DataXceiver extends Receiver implements Runnable {
String msg = "Not able to receive block " + block.getBlockId() + " from "
+ s.getRemoteSocketAddress() + " because threads quota is exceeded.";
LOG.warn(msg);
sendResponse(s, ERROR, msg, dnConf.socketWriteTimeout);
sendResponse(ERROR, msg);
return;
}
@ -699,17 +735,29 @@ class DataXceiver extends Receiver implements Runnable {
NetUtils.connect(proxySock, proxyAddr, dnConf.socketTimeout);
proxySock.setSoTimeout(dnConf.socketTimeout);
OutputStream baseStream = NetUtils.getOutputStream(proxySock,
OutputStream unbufProxyOut = NetUtils.getOutputStream(proxySock,
dnConf.socketWriteTimeout);
proxyOut = new DataOutputStream(new BufferedOutputStream(baseStream,
InputStream unbufProxyIn = NetUtils.getInputStream(proxySock);
if (dnConf.encryptDataTransfer) {
IOStreamPair encryptedStreams =
DataTransferEncryptor.getEncryptedStreams(
unbufProxyOut, unbufProxyIn,
datanode.blockPoolTokenSecretManager
.generateDataEncryptionKey(block.getBlockPoolId()));
unbufProxyOut = encryptedStreams.out;
unbufProxyIn = encryptedStreams.in;
}
proxyOut = new DataOutputStream(new BufferedOutputStream(unbufProxyOut,
HdfsConstants.SMALL_BUFFER_SIZE));
proxyReply = new DataInputStream(new BufferedInputStream(unbufProxyIn,
HdfsConstants.IO_FILE_BUFFER_SIZE));
/* send request to the proxy */
new Sender(proxyOut).copyBlock(block, blockToken);
// receive the response from the proxy
proxyReply = new DataInputStream(new BufferedInputStream(
NetUtils.getInputStream(proxySock), HdfsConstants.IO_FILE_BUFFER_SIZE));
BlockOpResponseProto copyResponse = BlockOpResponseProto.parseFrom(
HdfsProtoUtil.vintPrefixed(proxyReply));
@ -762,7 +810,7 @@ class DataXceiver extends Receiver implements Runnable {
// send response back
try {
sendResponse(s, opStatus, errMsg, dnConf.socketWriteTimeout);
sendResponse(opStatus, errMsg);
} catch (IOException ioe) {
LOG.warn("Error writing reply back to " + s.getRemoteSocketAddress());
}
@ -781,20 +829,13 @@ class DataXceiver extends Receiver implements Runnable {
/**
* Utility function for sending a response.
* @param s socket to write to
*
* @param opStatus status message to write
* @param timeout send timeout
**/
private static void sendResponse(Socket s, Status status, String message,
long timeout) throws IOException {
DataOutputStream reply = getStreamWithTimeout(s, timeout);
writeResponse(status, message, reply);
}
private static DataOutputStream getStreamWithTimeout(Socket s, long timeout)
throws IOException {
return new DataOutputStream(NetUtils.getOutputStream(s, timeout));
* @param message message to send to the client or other DN
*/
private void sendResponse(Status status,
String message) throws IOException {
writeResponse(status, message, getOutputStream());
}
private static void writeResponse(Status status, String message, OutputStream out)

View File

@ -606,7 +606,7 @@ public class DatanodeJspHelper {
try {
JspHelper.streamBlockInAscii(new InetSocketAddress(req.getServerName(),
datanodePort), bpid, blockId, blockToken, genStamp, blockSize,
startOffset, chunkSizeToView, out, conf);
startOffset, chunkSizeToView, out, conf, dfs.getDataEncryptionKey());
} catch (Exception e) {
out.print(e);
}
@ -699,7 +699,7 @@ public class DatanodeJspHelper {
out.print("<textarea cols=\"100\" rows=\"25\" wrap=\"virtual\" style=\"width:100%\" READONLY>");
JspHelper.streamBlockInAscii(addr, poolId, blockId, accessToken, genStamp,
blockSize, startOffset, chunkSizeToView, out, conf);
blockSize, startOffset, chunkSizeToView, out, conf, dfs.getDataEncryptionKey());
out.print("</textarea>");
dfs.close();
}

View File

@ -25,6 +25,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAUL
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_STANDBY_CHECKPOINTS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_STANDBY_CHECKPOINTS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY;
@ -461,7 +463,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY, DFS_BYTES_PER_CHECKSUM_DEFAULT),
conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY, DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT),
(short) conf.getInt(DFS_REPLICATION_KEY, DFS_REPLICATION_DEFAULT),
conf.getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT));
conf.getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT),
conf.getBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, DFS_ENCRYPT_DATA_TRANSFER_DEFAULT));
this.maxFsObjects = conf.getLong(DFS_NAMENODE_MAX_OBJECTS_KEY,
DFS_NAMENODE_MAX_OBJECTS_DEFAULT);
@ -2016,6 +2019,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
void setBlockPoolId(String bpid) {
blockPoolId = bpid;
blockManager.setBlockPoolId(blockPoolId);
}
/**

View File

@ -127,7 +127,7 @@ public class FileChecksumServlets {
datanode, conf, getUGI(request, conf));
final ClientProtocol nnproxy = dfs.getNamenode();
final MD5MD5CRC32FileChecksum checksum = DFSClient.getFileChecksum(
path, nnproxy, socketFactory, socketTimeout);
path, nnproxy, socketFactory, socketTimeout, dfs.getDataEncryptionKey());
MD5MD5CRC32FileChecksum.write(xml, checksum);
} catch(IOException ioe) {
writeXml(ioe, path, xml);

View File

@ -84,6 +84,7 @@ import org.apache.hadoop.hdfs.protocolPB.RefreshAuthorizationPolicyProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.RefreshAuthorizationPolicyProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.RefreshUserMappingsProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.RefreshUserMappingsProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
@ -1048,4 +1049,9 @@ class NameNodeRpcServer implements NamenodeProtocols {
}
return clientMachine;
}
@Override
public DataEncryptionKey getDataEncryptionKey() throws IOException {
return namesystem.getBlockManager().generateDataEncryptionKey();
}
}

View File

@ -560,7 +560,8 @@ public class NamenodeFsck {
block.getBlockId());
blockReader = BlockReaderFactory.newBlockReader(
conf, s, file, block, lblock
.getBlockToken(), 0, -1);
.getBlockToken(), 0, -1,
namenode.getRpcServer().getDataEncryptionKey());
} catch (IOException ex) {
// Put chosen node into dead list, continue

View File

@ -441,6 +441,12 @@ message SetBalancerBandwidthRequestProto {
message SetBalancerBandwidthResponseProto { // void response
}
message GetDataEncryptionKeyRequestProto { // no parameters
}
message GetDataEncryptionKeyResponseProto {
required DataEncryptionKeyProto dataEncryptionKey = 1;
}
service ClientNamenodeProtocol {
rpc getBlockLocations(GetBlockLocationsRequestProto)
@ -511,6 +517,8 @@ service ClientNamenodeProtocol {
returns(RenewDelegationTokenResponseProto);
rpc cancelDelegationToken(CancelDelegationTokenRequestProto)
returns(CancelDelegationTokenResponseProto);
rpc setBalancerBandwidth(SetBalancerBandwidthRequestProto)
rpc setBalancerBandwidth(SetBalancerBandwidthRequestProto)
returns(SetBalancerBandwidthResponseProto);
rpc getDataEncryptionKey(GetDataEncryptionKeyRequestProto)
returns(GetDataEncryptionKeyResponseProto);
}

View File

@ -25,6 +25,17 @@ option java_generate_equals_and_hash = true;
import "hdfs.proto";
message DataTransferEncryptorMessageProto {
enum DataTransferEncryptorStatus {
SUCCESS = 0;
ERROR_UNKNOWN_KEY = 1;
ERROR = 2;
}
required DataTransferEncryptorStatus status = 1;
optional bytes payload = 2;
optional string message = 3;
}
message BaseHeaderProto {
required ExtendedBlockProto block = 1;
optional BlockTokenIdentifierProto token = 2;

View File

@ -126,7 +126,16 @@ message LocatedBlockProto {
// their locations are not part of this object
required BlockTokenIdentifierProto blockToken = 5;
}
}
message DataEncryptionKeyProto {
required uint32 keyId = 1;
required string blockPoolId = 2;
required bytes nonce = 3;
required bytes encryptionKey = 4;
required uint64 expiryDate = 5;
optional string encryptionAlgorithm = 6;
}
/**
@ -178,6 +187,7 @@ message FsServerDefaultsProto {
required uint32 writePacketSize = 3;
required uint32 replication = 4; // Actually a short - only 16 bits used
required uint32 fileBufferSize = 5;
optional bool encryptDataTransfer = 6 [default = false];
}

View File

@ -1014,4 +1014,25 @@
</description>
</property>
<property>
<name>dfs.encrypt.data.transfer</name>
<value>false</value>
<description>
Whether or not actual block data that is read/written from/to HDFS should
be encrypted on the wire. This only needs to be set on the NN and DNs,
clients will deduce this automatically.
</description>
</property>
<property>
<name>dfs.encrypt.data.transfer.algorithm</name>
<value></value>
<description>
This value may be set to either "3des" or "rc4". If nothing is set, then
the configured JCE default on the system is used (usually 3DES.) It is
widely believed that 3DES is more cryptographically secure, but RC4 is
substantially faster.
</description>
</property>
</configuration>

View File

@ -155,7 +155,7 @@ public class BlockReaderTestUtil {
testBlock.getBlockToken(),
offset, lenToRead,
conf.getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
true, "");
true, "", null, null);
}
/**

View File

@ -60,7 +60,7 @@ public class TestClientBlockVerification {
RemoteBlockReader2 reader = (RemoteBlockReader2)spy(
util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
util.readAndCheckEOS(reader, FILE_SIZE_K * 1024, true);
verify(reader).sendReadResult(reader.dnSock, Status.CHECKSUM_OK);
verify(reader).sendReadResult(Status.CHECKSUM_OK);
reader.close();
}
@ -75,7 +75,7 @@ public class TestClientBlockVerification {
// We asked the blockreader for the whole file, and only read
// half of it, so no CHECKSUM_OK
verify(reader, never()).sendReadResult(reader.dnSock, Status.CHECKSUM_OK);
verify(reader, never()).sendReadResult(Status.CHECKSUM_OK);
reader.close();
}
@ -91,7 +91,7 @@ public class TestClientBlockVerification {
util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024 / 2));
// And read half the file
util.readAndCheckEOS(reader, FILE_SIZE_K * 1024 / 2, true);
verify(reader).sendReadResult(reader.dnSock, Status.CHECKSUM_OK);
verify(reader).sendReadResult(Status.CHECKSUM_OK);
reader.close();
}
@ -110,7 +110,7 @@ public class TestClientBlockVerification {
RemoteBlockReader2 reader = (RemoteBlockReader2)spy(
util.getBlockReader(testBlock, startOffset, length));
util.readAndCheckEOS(reader, length, true);
verify(reader).sendReadResult(reader.dnSock, Status.CHECKSUM_OK);
verify(reader).sendReadResult(Status.CHECKSUM_OK);
reader.close();
}
}

View File

@ -168,13 +168,13 @@ public class TestConnCache {
// Insert a socket to the NN
Socket nnSock = new Socket(nnAddr.getAddress(), nnAddr.getPort());
cache.put(nnSock);
assertSame("Read the write", nnSock, cache.get(nnAddr));
cache.put(nnSock);
cache.put(nnSock, null);
assertSame("Read the write", nnSock, cache.get(nnAddr).sock);
cache.put(nnSock, null);
// Insert DN socks
for (Socket dnSock : dnSockets) {
cache.put(dnSock);
cache.put(dnSock, null);
}
assertEquals("NN socket evicted", null, cache.get(nnAddr));
@ -182,7 +182,7 @@ public class TestConnCache {
// Lookup the DN socks
for (Socket dnSock : dnSockets) {
assertEquals("Retrieve cached sockets", dnSock, cache.get(dnAddr));
assertEquals("Retrieve cached sockets", dnSock, cache.get(dnAddr).sock);
dnSock.close();
}

View File

@ -113,7 +113,7 @@ public class TestDataTransferKeepalive {
// Take it out of the cache - reading should
// give an EOF.
Socket s = dfsClient.socketCache.get(dnAddr);
Socket s = dfsClient.socketCache.get(dnAddr).sock;
assertNotNull(s);
assertEquals(-1, NetUtils.getInputStream(s).read());
}

View File

@ -0,0 +1,459 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
import org.junit.Test;
import org.mockito.Mockito;
public class TestEncryptedTransfer {
private static final Log LOG = LogFactory.getLog(TestEncryptedTransfer.class);
private static final String PLAIN_TEXT = "this is very secret plain text";
private static final Path TEST_PATH = new Path("/non-encrypted-file");
private static void setEncryptionConfigKeys(Configuration conf) {
conf.setBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY, true);
conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
}
// Unset DFS_ENCRYPT_DATA_TRANSFER_KEY and DFS_DATA_ENCRYPTION_ALGORITHM_KEY
// on the client side to ensure that clients will detect this setting
// automatically from the NN.
private static FileSystem getFileSystem(Configuration conf) throws IOException {
Configuration localConf = new Configuration(conf);
localConf.setBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY, false);
localConf.unset(DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
return FileSystem.get(localConf);
}
@Test
public void testEncryptedRead() throws IOException {
MiniDFSCluster cluster = null;
try {
Configuration conf = new Configuration();
cluster = new MiniDFSCluster.Builder(conf).build();
FileSystem fs = getFileSystem(conf);
writeTestDataToFile(fs);
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
FileChecksum checksum = fs.getFileChecksum(TEST_PATH);
fs.close();
cluster.shutdown();
setEncryptionConfigKeys(conf);
cluster = new MiniDFSCluster.Builder(conf)
.manageDataDfsDirs(false)
.manageNameDfsDirs(false)
.format(false)
.startupOption(StartupOption.REGULAR)
.build();
fs = getFileSystem(conf);
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
fs.close();
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
@Test
public void testEncryptedReadWithRC4() throws IOException {
MiniDFSCluster cluster = null;
try {
Configuration conf = new Configuration();
cluster = new MiniDFSCluster.Builder(conf).build();
FileSystem fs = getFileSystem(conf);
writeTestDataToFile(fs);
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
FileChecksum checksum = fs.getFileChecksum(TEST_PATH);
fs.close();
cluster.shutdown();
setEncryptionConfigKeys(conf);
// It'll use 3DES by default, but we set it to rc4 here.
conf.set(DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY, "rc4");
cluster = new MiniDFSCluster.Builder(conf)
.manageDataDfsDirs(false)
.manageNameDfsDirs(false)
.format(false)
.startupOption(StartupOption.REGULAR)
.build();
fs = getFileSystem(conf);
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
fs.close();
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
@Test
public void testEncryptedReadAfterNameNodeRestart() throws IOException {
MiniDFSCluster cluster = null;
try {
Configuration conf = new Configuration();
cluster = new MiniDFSCluster.Builder(conf).build();
FileSystem fs = getFileSystem(conf);
writeTestDataToFile(fs);
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
FileChecksum checksum = fs.getFileChecksum(TEST_PATH);
fs.close();
cluster.shutdown();
setEncryptionConfigKeys(conf);
cluster = new MiniDFSCluster.Builder(conf)
.manageDataDfsDirs(false)
.manageNameDfsDirs(false)
.format(false)
.startupOption(StartupOption.REGULAR)
.build();
fs = getFileSystem(conf);
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
fs.close();
cluster.restartNameNode();
fs = getFileSystem(conf);
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
fs.close();
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
@Test
public void testClientThatDoesNotSupportEncryption() throws IOException {
MiniDFSCluster cluster = null;
try {
Configuration conf = new Configuration();
cluster = new MiniDFSCluster.Builder(conf).build();
FileSystem fs = getFileSystem(conf);
writeTestDataToFile(fs);
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
fs.close();
cluster.shutdown();
setEncryptionConfigKeys(conf);
cluster = new MiniDFSCluster.Builder(conf)
.manageDataDfsDirs(false)
.manageNameDfsDirs(false)
.format(false)
.startupOption(StartupOption.REGULAR)
.build();
fs = getFileSystem(conf);
DFSClient client = DFSClientAdapter.getDFSClient((DistributedFileSystem) fs);
DFSClient spyClient = Mockito.spy(client);
Mockito.doReturn(false).when(spyClient).shouldEncryptData();
DFSClientAdapter.setDFSClient((DistributedFileSystem) fs, spyClient);
LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
LogFactory.getLog(DataNode.class));
try {
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
fail("Should not have been able to read without encryption enabled.");
} catch (IOException ioe) {
GenericTestUtils.assertExceptionContains("Could not obtain block:",
ioe);
} finally {
logs.stopCapturing();
}
fs.close();
GenericTestUtils.assertMatches(logs.getOutput(),
"Failed to read expected encryption handshake from client at");
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
@Test
public void testLongLivedReadClientAfterRestart() throws IOException {
MiniDFSCluster cluster = null;
try {
Configuration conf = new Configuration();
cluster = new MiniDFSCluster.Builder(conf).build();
FileSystem fs = getFileSystem(conf);
writeTestDataToFile(fs);
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
FileChecksum checksum = fs.getFileChecksum(TEST_PATH);
fs.close();
cluster.shutdown();
setEncryptionConfigKeys(conf);
cluster = new MiniDFSCluster.Builder(conf)
.manageDataDfsDirs(false)
.manageNameDfsDirs(false)
.format(false)
.startupOption(StartupOption.REGULAR)
.build();
fs = getFileSystem(conf);
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
// Restart the NN and DN, after which the client's encryption key will no
// longer be valid.
cluster.restartNameNode();
assertTrue(cluster.restartDataNode(0));
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
fs.close();
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
@Test
public void testLongLivedWriteClientAfterRestart() throws IOException {
MiniDFSCluster cluster = null;
try {
Configuration conf = new Configuration();
setEncryptionConfigKeys(conf);
cluster = new MiniDFSCluster.Builder(conf).build();
FileSystem fs = getFileSystem(conf);
writeTestDataToFile(fs);
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
// Restart the NN and DN, after which the client's encryption key will no
// longer be valid.
cluster.restartNameNode();
assertTrue(cluster.restartDataNodes());
cluster.waitActive();
writeTestDataToFile(fs);
assertEquals(PLAIN_TEXT + PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
fs.close();
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
@Test
public void testLongLivedClient() throws IOException, InterruptedException {
MiniDFSCluster cluster = null;
try {
Configuration conf = new Configuration();
cluster = new MiniDFSCluster.Builder(conf).build();
FileSystem fs = getFileSystem(conf);
writeTestDataToFile(fs);
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
FileChecksum checksum = fs.getFileChecksum(TEST_PATH);
fs.close();
cluster.shutdown();
setEncryptionConfigKeys(conf);
cluster = new MiniDFSCluster.Builder(conf)
.manageDataDfsDirs(false)
.manageNameDfsDirs(false)
.format(false)
.startupOption(StartupOption.REGULAR)
.build();
BlockTokenSecretManager btsm = cluster.getNamesystem().getBlockManager()
.getBlockTokenSecretManager();
btsm.setKeyUpdateIntervalForTesting(2 * 1000);
btsm.setTokenLifetime(2 * 1000);
btsm.clearAllKeysForTesting();
fs = getFileSystem(conf);
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
// Sleep for 15 seconds, after which the encryption key will no longer be
// valid. It needs to be a few multiples of the block token lifetime,
// since several block tokens are valid at any given time (the current
// and the last two, by default.)
LOG.info("Sleeping so that encryption keys expire...");
Thread.sleep(15 * 1000);
LOG.info("Done sleeping.");
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
fs.close();
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
@Test
public void testEncryptedWriteWithOneDn() throws IOException {
testEncryptedWrite(1);
}
@Test
public void testEncryptedWriteWithTwoDns() throws IOException {
testEncryptedWrite(2);
}
@Test
public void testEncryptedWriteWithMultipleDns() throws IOException {
testEncryptedWrite(10);
}
private void testEncryptedWrite(int numDns) throws IOException {
MiniDFSCluster cluster = null;
try {
Configuration conf = new Configuration();
setEncryptionConfigKeys(conf);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDns).build();
FileSystem fs = getFileSystem(conf);
writeTestDataToFile(fs);
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
fs.close();
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
@Test
public void testEncryptedAppend() throws IOException {
MiniDFSCluster cluster = null;
try {
Configuration conf = new Configuration();
setEncryptionConfigKeys(conf);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
FileSystem fs = getFileSystem(conf);
writeTestDataToFile(fs);
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
writeTestDataToFile(fs);
assertEquals(PLAIN_TEXT + PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
fs.close();
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
@Test
public void testEncryptedAppendRequiringBlockTransfer() throws IOException {
MiniDFSCluster cluster = null;
try {
Configuration conf = new Configuration();
setEncryptionConfigKeys(conf);
// start up 4 DNs
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4).build();
FileSystem fs = getFileSystem(conf);
// Create a file with replication 3, so its block is on 3 / 4 DNs.
writeTestDataToFile(fs);
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
// Shut down one of the DNs holding a block replica.
FSDataInputStream in = fs.open(TEST_PATH);
List<LocatedBlock> locatedBlocks = DFSTestUtil.getAllBlocks(in);
in.close();
assertEquals(1, locatedBlocks.size());
assertEquals(3, locatedBlocks.get(0).getLocations().length);
DataNode dn = cluster.getDataNode(locatedBlocks.get(0).getLocations()[0].getIpcPort());
dn.shutdown();
// Reopen the file for append, which will need to add another DN to the
// pipeline and in doing so trigger a block transfer.
writeTestDataToFile(fs);
assertEquals(PLAIN_TEXT + PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
fs.close();
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
private static void writeTestDataToFile(FileSystem fs) throws IOException {
OutputStream out = null;
if (!fs.exists(TEST_PATH)) {
out = fs.create(TEST_PATH);
} else {
out = fs.append(TEST_PATH);
}
out.write(PLAIN_TEXT.getBytes());
out.close();
}
}

View File

@ -162,7 +162,7 @@ public class TestBlockToken {
public void testWritable() throws Exception {
TestWritable.testWritable(new BlockTokenIdentifier());
BlockTokenSecretManager sm = new BlockTokenSecretManager(
blockKeyUpdateInterval, blockTokenLifetime, 0);
blockKeyUpdateInterval, blockTokenLifetime, 0, "fake-pool", null);
TestWritable.testWritable(generateTokenId(sm, block1,
EnumSet.allOf(BlockTokenSecretManager.AccessMode.class)));
TestWritable.testWritable(generateTokenId(sm, block2,
@ -201,9 +201,9 @@ public class TestBlockToken {
@Test
public void testBlockTokenSecretManager() throws Exception {
BlockTokenSecretManager masterHandler = new BlockTokenSecretManager(
blockKeyUpdateInterval, blockTokenLifetime, 0);
blockKeyUpdateInterval, blockTokenLifetime, 0, "fake-pool", null);
BlockTokenSecretManager slaveHandler = new BlockTokenSecretManager(
blockKeyUpdateInterval, blockTokenLifetime);
blockKeyUpdateInterval, blockTokenLifetime, "fake-pool", null);
ExportedBlockKeys keys = masterHandler.exportKeys();
slaveHandler.addKeys(keys);
tokenGenerationAndVerification(masterHandler, slaveHandler);
@ -238,7 +238,7 @@ public class TestBlockToken {
@Test
public void testBlockTokenRpc() throws Exception {
BlockTokenSecretManager sm = new BlockTokenSecretManager(
blockKeyUpdateInterval, blockTokenLifetime, 0);
blockKeyUpdateInterval, blockTokenLifetime, 0, "fake-pool", null);
Token<BlockTokenIdentifier> token = sm.generateToken(block3,
EnumSet.allOf(BlockTokenSecretManager.AccessMode.class));
@ -273,7 +273,7 @@ public class TestBlockToken {
public void testBlockTokenRpcLeak() throws Exception {
Assume.assumeTrue(FD_DIR.exists());
BlockTokenSecretManager sm = new BlockTokenSecretManager(
blockKeyUpdateInterval, blockTokenLifetime, 0);
blockKeyUpdateInterval, blockTokenLifetime, 0, "fake-pool", null);
Token<BlockTokenIdentifier> token = sm.generateToken(block3,
EnumSet.allOf(BlockTokenSecretManager.AccessMode.class));
@ -342,9 +342,9 @@ public class TestBlockToken {
for (int i = 0; i < 10; i++) {
String bpid = Integer.toString(i);
BlockTokenSecretManager masterHandler = new BlockTokenSecretManager(
blockKeyUpdateInterval, blockTokenLifetime, 0);
blockKeyUpdateInterval, blockTokenLifetime, 0, "fake-pool", null);
BlockTokenSecretManager slaveHandler = new BlockTokenSecretManager(
blockKeyUpdateInterval, blockTokenLifetime);
blockKeyUpdateInterval, blockTokenLifetime, "fake-pool", null);
bpMgr.addBlockPool(bpid, slaveHandler);
ExportedBlockKeys keys = masterHandler.exportKeys();

View File

@ -396,7 +396,10 @@ public class TestBalancer {
* then a new empty node is added to the cluster*/
@Test
public void testBalancer0() throws Exception {
Configuration conf = new HdfsConfiguration();
testBalancer0Internal(new HdfsConfiguration());
}
void testBalancer0Internal(Configuration conf) throws Exception {
initConf(conf);
oneNodeTest(conf);
twoNodeTest(conf);
@ -405,7 +408,10 @@ public class TestBalancer {
/** Test unevenly distributed cluster */
@Test
public void testBalancer1() throws Exception {
Configuration conf = new HdfsConfiguration();
testBalancer1Internal(new HdfsConfiguration());
}
void testBalancer1Internal(Configuration conf) throws Exception {
initConf(conf);
testUnevenDistribution(conf,
new long[] {50*CAPACITY/100, 10*CAPACITY/100},
@ -415,7 +421,10 @@ public class TestBalancer {
@Test
public void testBalancer2() throws Exception {
Configuration conf = new HdfsConfiguration();
testBalancer2Internal(new HdfsConfiguration());
}
void testBalancer2Internal(Configuration conf) throws Exception {
initConf(conf);
testBalancerDefaultConstructor(conf, new long[] { CAPACITY, CAPACITY },
new String[] { RACK0, RACK1 }, CAPACITY, RACK2);

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.balancer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.junit.Before;
import org.junit.Test;
public class TestBalancerWithEncryptedTransfer {
private Configuration conf = new HdfsConfiguration();
@Before
public void setUpConf() {
conf.setBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY, true);
conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
}
@Test
public void testEncryptedBalancer0() throws Exception {
new TestBalancer().testBalancer0Internal(conf);
}
@Test
public void testEncryptedBalancer1() throws Exception {
new TestBalancer().testBalancer1Internal(conf);
}
@Test
public void testEncryptedBalancer2() throws Exception {
new TestBalancer().testBalancer2Internal(conf);
}
}

View File

@ -146,7 +146,7 @@ public class TestBlockTokenWithDFS {
"test-blockpoolid", block.getBlockId());
blockReader = BlockReaderFactory.newBlockReader(
conf, s, file, block,
lblock.getBlockToken(), 0, -1);
lblock.getBlockToken(), 0, -1, null);
} catch (IOException ex) {
if (ex instanceof InvalidBlockTokenException) {

View File

@ -281,7 +281,7 @@ public class TestDataNodeVolumeFailure {
"test-blockpoolid",
block.getBlockId());
BlockReaderFactory.newBlockReader(conf, s, file, block, lblock
.getBlockToken(), 0, -1);
.getBlockToken(), 0, -1, null);
// nothing - if it fails - it will throw and exception
}