HDFS-13699. Add DFSClient sending handshake token to DataNode, and allow DataNode overwrite downstream QOP. Contributed by Chen Liang.

This commit is contained in:
Chen Liang 2019-04-12 17:37:51 -07:00
parent 3135cc35e3
commit ef9b0a1d9a
14 changed files with 502 additions and 28 deletions

View File

@ -167,7 +167,7 @@ public abstract class SecretManager<T extends TokenIdentifier> {
* @param key the secret key
* @return the bytes of the generated password
*/
protected static byte[] createPassword(byte[] identifier,
public static byte[] createPassword(byte[] identifier,
SecretKey key) {
Mac mac = threadLocalMac.get();
try {

View File

@ -157,6 +157,9 @@ public interface HdfsClientConfigKeys {
String DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY =
"dfs.encrypt.data.transfer.cipher.suites";
String DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_NEW_QOP_KEY =
"dfs.encrypt.data.overwrite.downstream.new.qop";
String DFS_DATA_TRANSFER_PROTECTION_KEY = "dfs.data.transfer.protection";
String DFS_DATA_TRANSFER_PROTECTION_DEFAULT = "";
String DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY =

View File

@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.HandshakeSecretProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CipherOptionProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.security.SaslPropertiesResolver;
@ -249,6 +250,51 @@ public final class DataTransferSaslUtil {
}
}
static class SaslMessageWithHandshake {
private final byte[] payload;
private final byte[] secret;
private final String bpid;
SaslMessageWithHandshake(byte[] payload, byte[] secret, String bpid) {
this.payload = payload;
this.secret = secret;
this.bpid = bpid;
}
byte[] getPayload() {
return payload;
}
byte[] getSecret() {
return secret;
}
String getBpid() {
return bpid;
}
}
public static SaslMessageWithHandshake readSaslMessageWithHandshakeSecret(
InputStream in) throws IOException {
DataTransferEncryptorMessageProto proto =
DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in));
if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) {
throw new InvalidEncryptionKeyException(proto.getMessage());
} else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) {
throw new IOException(proto.getMessage());
} else {
byte[] payload = proto.getPayload().toByteArray();
byte[] secret = null;
String bpid = null;
if (proto.hasHandshakeSecret()) {
HandshakeSecretProto handshakeSecret = proto.getHandshakeSecret();
secret = handshakeSecret.getSecret().toByteArray();
bpid = handshakeSecret.getBpid();
}
return new SaslMessageWithHandshake(payload, secret, bpid);
}
}
/**
* Negotiate a cipher option which server supports.
*
@ -375,6 +421,12 @@ public final class DataTransferSaslUtil {
sendSaslMessage(out, DataTransferEncryptorStatus.SUCCESS, payload, null);
}
public static void sendSaslMessageHandshakeSecret(OutputStream out,
byte[] payload, byte[] secret, String bpid) throws IOException {
sendSaslMessageHandshakeSecret(out, DataTransferEncryptorStatus.SUCCESS,
payload, null, secret, bpid);
}
/**
* Send a SASL negotiation message and negotiation cipher options to server.
*
@ -497,6 +549,13 @@ public final class DataTransferSaslUtil {
public static void sendSaslMessage(OutputStream out,
DataTransferEncryptorStatus status, byte[] payload, String message)
throws IOException {
sendSaslMessage(out, status, payload, message, null);
}
public static void sendSaslMessage(OutputStream out,
DataTransferEncryptorStatus status, byte[] payload, String message,
HandshakeSecretProto handshakeSecret)
throws IOException {
DataTransferEncryptorMessageProto.Builder builder =
DataTransferEncryptorMessageProto.newBuilder();
@ -507,12 +566,25 @@ public final class DataTransferSaslUtil {
if (message != null) {
builder.setMessage(message);
}
if (handshakeSecret != null) {
builder.setHandshakeSecret(handshakeSecret);
}
DataTransferEncryptorMessageProto proto = builder.build();
proto.writeDelimitedTo(out);
out.flush();
}
public static void sendSaslMessageHandshakeSecret(OutputStream out,
DataTransferEncryptorStatus status, byte[] payload, String message,
byte[] secret, String bpid) throws IOException {
HandshakeSecretProto.Builder builder =
HandshakeSecretProto.newBuilder();
builder.setSecret(ByteString.copyFrom(secret));
builder.setBpid(bpid);
sendSaslMessage(out, status, payload, message, builder.build());
}
/**
* There is no reason to instantiate this class.
*/

View File

@ -18,8 +18,11 @@
package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_NEW_QOP_KEY;
import static org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil.*;
import com.google.common.annotations.VisibleForTesting;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
@ -31,6 +34,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.crypto.SecretKey;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.callback.NameCallback;
@ -39,6 +43,7 @@ import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.sasl.RealmCallback;
import javax.security.sasl.RealmChoiceCallback;
import javax.security.sasl.Sasl;
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
@ -54,6 +59,7 @@ import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.security.SaslPropertiesResolver;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.Token;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -83,6 +89,10 @@ public class SaslDataTransferClient {
private final SaslPropertiesResolver saslPropsResolver;
private final TrustedChannelResolver trustedChannelResolver;
// Store the most recent successfully negotiated QOP,
// for testing purpose only
private String targetQOP;
/**
* Creates a new SaslDataTransferClient. This constructor is used in cases
* where it is not relevant to track if a secure client did a fallback to
@ -140,7 +150,7 @@ public class SaslDataTransferClient {
DataEncryptionKey encryptionKey = !trustedChannelResolver.isTrusted() ?
encryptionKeyFactory.newDataEncryptionKey() : null;
IOStreamPair ios = send(socket.getInetAddress(), underlyingOut,
underlyingIn, encryptionKey, accessToken, datanodeId);
underlyingIn, encryptionKey, accessToken, datanodeId, null);
return ios != null ? ios : new IOStreamPair(underlyingIn, underlyingOut);
}
@ -180,8 +190,19 @@ public class SaslDataTransferClient {
InputStream underlyingIn, DataEncryptionKeyFactory encryptionKeyFactory,
Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId)
throws IOException {
return socketSend(socket, underlyingOut, underlyingIn, encryptionKeyFactory,
accessToken, datanodeId, null);
}
public IOStreamPair socketSend(
Socket socket, OutputStream underlyingOut, InputStream underlyingIn,
DataEncryptionKeyFactory encryptionKeyFactory,
Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId,
SecretKey secretKey)
throws IOException {
IOStreamPair ios = checkTrustAndSend(socket.getInetAddress(), underlyingOut,
underlyingIn, encryptionKeyFactory, accessToken, datanodeId);
underlyingIn, encryptionKeyFactory, accessToken, datanodeId,
secretKey);
return ios != null ? ios : new IOStreamPair(underlyingIn, underlyingOut);
}
@ -203,17 +224,26 @@ public class SaslDataTransferClient {
DataEncryptionKeyFactory encryptionKeyFactory,
Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId)
throws IOException {
return checkTrustAndSend(addr, underlyingOut, underlyingIn,
encryptionKeyFactory, accessToken, datanodeId, null);
}
private IOStreamPair checkTrustAndSend(
InetAddress addr, OutputStream underlyingOut, InputStream underlyingIn,
DataEncryptionKeyFactory encryptionKeyFactory,
Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId,
SecretKey secretKey)
throws IOException {
boolean localTrusted = trustedChannelResolver.isTrusted();
boolean remoteTrusted = trustedChannelResolver.isTrusted(addr);
LOG.debug("SASL encryption trust check: localHostTrusted = {}, "
LOG.info("SASL encryption trust check: localHostTrusted = {}, "
+ "remoteHostTrusted = {}", localTrusted, remoteTrusted);
if (!localTrusted || !remoteTrusted) {
// The encryption key factory only returns a key if encryption is enabled.
DataEncryptionKey encryptionKey = encryptionKeyFactory
.newDataEncryptionKey();
DataEncryptionKey encryptionKey =
encryptionKeyFactory.newDataEncryptionKey();
return send(addr, underlyingOut, underlyingIn, encryptionKey, accessToken,
datanodeId);
datanodeId, secretKey);
} else {
LOG.debug(
"SASL client skipping handshake on trusted connection for addr = {}, "
@ -237,13 +267,14 @@ public class SaslDataTransferClient {
*/
private IOStreamPair send(InetAddress addr, OutputStream underlyingOut,
InputStream underlyingIn, DataEncryptionKey encryptionKey,
Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId)
Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId,
SecretKey secretKey)
throws IOException {
if (encryptionKey != null) {
LOG.debug("SASL client doing encrypted handshake for addr = {}, "
+ "datanodeId = {}", addr, datanodeId);
return getEncryptedStreams(addr, underlyingOut, underlyingIn,
encryptionKey);
encryptionKey, accessToken, secretKey);
} else if (!UserGroupInformation.isSecurityEnabled()) {
LOG.debug("SASL client skipping handshake in unsecured configuration for "
+ "addr = {}, datanodeId = {}", addr, datanodeId);
@ -264,7 +295,8 @@ public class SaslDataTransferClient {
LOG.debug(
"SASL client doing general handshake for addr = {}, datanodeId = {}",
addr, datanodeId);
return getSaslStreams(addr, underlyingOut, underlyingIn, accessToken);
return getSaslStreams(addr, underlyingOut, underlyingIn,
accessToken, secretKey);
} else {
// It's a secured cluster using non-privileged ports, but no SASL. The
// only way this can happen is if the DataNode has
@ -287,11 +319,20 @@ public class SaslDataTransferClient {
* @throws IOException for any error
*/
private IOStreamPair getEncryptedStreams(InetAddress addr,
OutputStream underlyingOut,
InputStream underlyingIn, DataEncryptionKey encryptionKey)
OutputStream underlyingOut, InputStream underlyingIn,
DataEncryptionKey encryptionKey,
Token<BlockTokenIdentifier> accessToken,
SecretKey secretKey)
throws IOException {
Map<String, String> saslProps = createSaslPropertiesForEncryption(
encryptionKey.encryptionAlgorithm);
if (secretKey != null) {
LOG.debug("DataNode overwriting downstream QOP" +
saslProps.get(Sasl.QOP));
byte[] newSecret = SecretManager.createPassword(saslProps.get(Sasl.QOP)
.getBytes(Charsets.UTF_8), secretKey);
accessToken.setDNHandshakeSecret(newSecret);
}
LOG.debug("Client using encryption algorithm {}",
encryptionKey.encryptionAlgorithm);
@ -301,7 +342,7 @@ public class SaslDataTransferClient {
CallbackHandler callbackHandler = new SaslClientCallbackHandler(userName,
password);
return doSaslHandshake(addr, underlyingOut, underlyingIn, userName,
saslProps, callbackHandler);
saslProps, callbackHandler, accessToken);
}
/**
@ -370,6 +411,11 @@ public class SaslDataTransferClient {
}
}
@VisibleForTesting
public String getTargetQOP() {
return targetQOP;
}
/**
* Sends client SASL negotiation for general-purpose handshake.
*
@ -382,16 +428,36 @@ public class SaslDataTransferClient {
*/
private IOStreamPair getSaslStreams(InetAddress addr,
OutputStream underlyingOut, InputStream underlyingIn,
Token<BlockTokenIdentifier> accessToken)
Token<BlockTokenIdentifier> accessToken,
SecretKey secretKey)
throws IOException {
Map<String, String> saslProps = saslPropsResolver.getClientProperties(addr);
// secretKey != null only happens when this is called by DN
// sending to downstream DN. If called from client, this will be null,
// as there is no key for client to generate mac instance.
// So that, if a different QOP is desired for inter-DN communication,
// the check below will use new QOP to create a secret, which includes
// the new QOP.
if (secretKey != null) {
String newQOP = conf
.get(DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_NEW_QOP_KEY);
if (newQOP != null) {
saslProps.put(Sasl.QOP, newQOP);
}
LOG.debug("DataNode overwriting downstream QOP " +
saslProps.get(Sasl.QOP));
byte[] newSecret = SecretManager.createPassword(
saslProps.get(Sasl.QOP).getBytes(Charsets.UTF_8), secretKey);
accessToken.setDNHandshakeSecret(newSecret);
}
targetQOP = saslProps.get(Sasl.QOP);
String userName = buildUserName(accessToken);
char[] password = buildClientPassword(accessToken);
CallbackHandler callbackHandler = new SaslClientCallbackHandler(userName,
password);
return doSaslHandshake(addr, underlyingOut, underlyingIn, userName,
saslProps, callbackHandler);
saslProps, callbackHandler, accessToken);
}
/**
@ -435,8 +501,8 @@ public class SaslDataTransferClient {
*/
private IOStreamPair doSaslHandshake(InetAddress addr,
OutputStream underlyingOut, InputStream underlyingIn, String userName,
Map<String, String> saslProps,
CallbackHandler callbackHandler) throws IOException {
Map<String, String> saslProps, CallbackHandler callbackHandler,
Token<BlockTokenIdentifier> accessToken) throws IOException {
DataOutputStream out = new DataOutputStream(underlyingOut);
DataInputStream in = new DataInputStream(underlyingIn);
@ -449,7 +515,22 @@ public class SaslDataTransferClient {
try {
// Start of handshake - "initial response" in SASL terminology.
sendSaslMessage(out, new byte[0]);
// The handshake secret can be null, this happens when client is running
// a new version but the cluster does not have this feature. In which case
// there will be no encrypted secret sent from NN.
byte[] handshakeSecret = accessToken.getDnHandshakeSecret();
if (handshakeSecret == null || handshakeSecret.length == 0) {
LOG.debug("Handshake secret is null, sending without "
+ "handshake secret.");
sendSaslMessage(out, new byte[0]);
} else {
LOG.debug("Sending handshake secret.");
BlockTokenIdentifier identifier = new BlockTokenIdentifier();
identifier.readFields(new DataInputStream(
new ByteArrayInputStream(accessToken.getIdentifier())));
String bpid = identifier.getBlockPoolId();
sendSaslMessageHandshakeSecret(out, new byte[0], handshakeSecret, bpid);
}
// step 1
byte[] remoteResponse = readSaslMessage(in);

View File

@ -43,6 +43,12 @@ message DataTransferEncryptorMessageProto {
optional bytes payload = 2;
optional string message = 3;
repeated CipherOptionProto cipherOption = 4;
optional HandshakeSecretProto handshakeSecret = 5;
}
message HandshakeSecretProto {
required bytes secret = 1;
required string bpid = 2;
}
message BaseHeaderProto {

View File

@ -956,6 +956,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
// Security-related configs
public static final String DFS_ENCRYPT_DATA_TRANSFER_KEY = "dfs.encrypt.data.transfer";
public static final String DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_KEY =
"dfs.encrypt.data.overwrite.downstream.derived.qop";
public static final boolean DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_DEFAULT =
false;
public static final boolean DFS_ENCRYPT_DATA_TRANSFER_DEFAULT = false;
public static final String DFS_XFRAME_OPTION_ENABLED = "dfs.xframe.enabled";
public static final boolean DFS_XFRAME_OPTION_ENABLED_DEFAULT = true;

View File

@ -21,15 +21,18 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSF
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
import static org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil.*;
import com.google.common.annotations.VisibleForTesting;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import javax.crypto.SecretKey;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.callback.NameCallback;
@ -37,6 +40,7 @@ import javax.security.auth.callback.PasswordCallback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.sasl.AuthorizeCallback;
import javax.security.sasl.RealmCallback;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslException;
import org.apache.commons.codec.binary.Base64;
@ -48,12 +52,15 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
import org.apache.hadoop.hdfs.security.token.block.BlockKey;
import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.DNConf;
import org.apache.hadoop.security.SaslPropertiesResolver;
import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -78,6 +85,10 @@ public class SaslDataTransferServer {
private final BlockPoolTokenSecretManager blockPoolTokenSecretManager;
private final DNConf dnConf;
// Store the most recent successfully negotiated QOP,
// for testing purpose only
private String negotiatedQOP;
/**
* Creates a new SaslDataTransferServer.
*
@ -337,6 +348,26 @@ public class SaslDataTransferServer {
return identifier;
}
private String examineSecret(byte[] secret, String bpid) {
BlockKey blockKey = blockPoolTokenSecretManager.get(bpid).getCurrentKey();
SecretKey secretKey = blockKey.getKey();
for (SaslRpcServer.QualityOfProtection qop :
SaslRpcServer.QualityOfProtection.values()) {
String qopString = qop.getSaslQop();
byte[] data = qopString.getBytes(Charsets.UTF_8);
byte[] encryptedData = SecretManager.createPassword(data, secretKey);
if (Arrays.equals(encryptedData, secret)) {
return qopString;
}
}
return null;
}
@VisibleForTesting
public String getNegotiatedQOP() {
return negotiatedQOP;
}
/**
* This method actually executes the server-side SASL handshake.
*
@ -355,9 +386,6 @@ public class SaslDataTransferServer {
DataInputStream in = new DataInputStream(underlyingIn);
DataOutputStream out = new DataOutputStream(underlyingOut);
SaslParticipant sasl = SaslParticipant.createServerSaslParticipant(saslProps,
callbackHandler);
int magicNumber = in.readInt();
if (magicNumber != SASL_TRANSFER_MAGIC_NUMBER) {
throw new InvalidMagicNumberException(magicNumber,
@ -365,7 +393,23 @@ public class SaslDataTransferServer {
}
try {
// step 1
byte[] remoteResponse = readSaslMessage(in);
SaslMessageWithHandshake message = readSaslMessageWithHandshakeSecret(in);
byte[] secret = message.getSecret();
String bpid = message.getBpid();
if (secret != null || bpid != null) {
// sanity check, if one is null, the other must also not be null
assert(secret != null && bpid != null);
String qop = examineSecret(secret, bpid);
if (qop != null) {
saslProps.put(Sasl.QOP, qop);
} else {
LOG.error("Unable to match secret to a QOP!");
}
}
SaslParticipant sasl = SaslParticipant.createServerSaslParticipant(
saslProps, callbackHandler);
byte[] remoteResponse = message.getPayload();
byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
sendSaslMessage(out, localResponse);
@ -379,6 +423,7 @@ public class SaslDataTransferServer {
checkSaslComplete(sasl, saslProps);
CipherOption cipherOption = null;
negotiatedQOP = sasl.getNegotiatedQop();
if (sasl.isNegotiatedQopPrivacy()) {
// Negotiate a cipher option
Configuration conf = dnConf.getConf();

View File

@ -32,6 +32,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_P
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
@ -89,6 +91,7 @@ public class DNConf {
final boolean syncOnClose;
final boolean encryptDataTransfer;
final boolean connectToDnViaHostname;
final boolean overwriteDownstreamDerivedQOP;
final long readaheadLength;
final long heartBeatInterval;
@ -239,6 +242,9 @@ public class DNConf {
this.encryptDataTransfer = getConf().getBoolean(
DFS_ENCRYPT_DATA_TRANSFER_KEY,
DFS_ENCRYPT_DATA_TRANSFER_DEFAULT);
this.overwriteDownstreamDerivedQOP = getConf().getBoolean(
DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_KEY,
DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_DEFAULT);
this.encryptionAlgorithm = getConf().get(DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
this.trustedChannelResolver = TrustedChannelResolver.getInstance(getConf());
this.saslPropsResolver = DataTransferSaslUtil.getSaslPropertiesResolver(

View File

@ -1781,7 +1781,12 @@ public class DataNode extends ReconfigurableBase
public int getXferPort() {
return streamingAddr.getPort();
}
@VisibleForTesting
public SaslDataTransferServer getSaslServer() {
return saslServer;
}
/**
* @return name useful for logging
*/

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import javax.crypto.SecretKey;
import org.apache.commons.logging.Log;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSUtilClient;
@ -47,6 +48,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCirc
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.security.token.block.BlockKey;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.BlockChecksumHelper.BlockChecksumComputer;
import org.apache.hadoop.hdfs.server.datanode.BlockChecksumHelper.AbstractBlockChecksumComputer;
@ -798,8 +800,16 @@ class DataXceiver extends Receiver implements Runnable {
InputStream unbufMirrorIn = NetUtils.getInputStream(mirrorSock);
DataEncryptionKeyFactory keyFactory =
datanode.getDataEncryptionKeyFactoryForBlock(block);
IOStreamPair saslStreams = datanode.saslClient.socketSend(mirrorSock,
unbufMirrorOut, unbufMirrorIn, keyFactory, blockToken, targets[0]);
SecretKey secretKey = null;
if (dnConf.overwriteDownstreamDerivedQOP) {
String bpid = block.getBlockPoolId();
BlockKey blockKey = datanode.blockPoolTokenSecretManager
.get(bpid).getCurrentKey();
secretKey = blockKey.getKey();
}
IOStreamPair saslStreams = datanode.saslClient.socketSend(
mirrorSock, unbufMirrorOut, unbufMirrorIn, keyFactory,
blockToken, targets[0], secretKey);
unbufMirrorOut = saslStreams.out;
unbufMirrorIn = saslStreams.in;
mirrorOut = new DataOutputStream(new BufferedOutputStream(unbufMirrorOut,

View File

@ -5066,6 +5066,28 @@
</description>
</property>
<property>
<name>dfs.encrypt.data.overwrite.downstream.derived.qop</name>
<value>false</value>
<description>
A boolean specifies whether DN should overwrite the downstream
QOP in a write pipeline. This is used in the case where client
talks to first DN with a QOP, but inter-DN communication needs to be
using a different QOP. If set to false, the default behaviour is that
inter-DN communication will use the same QOP as client-DN connection.
</description>
</property>
<property>
<name>dfs.encrypt.data.overwrite.downstream.new.qop</name>
<value></value>
<description>
When dfs.datanode.overwrite.downstream.derived.qop is set to true,
this configuration specifies the new QOP to be used to overwrite
inter-DN QOP.
</description>
</property>
<property>
<name>dfs.namenode.blockreport.queue.size</name>
<value>1024</value>

View File

@ -37,7 +37,7 @@ import static org.junit.Assert.assertTrue;
*/
public class TestHAAuxiliaryPort {
@Test
public void testTest() throws Exception {
public void testHAAuxiliaryPort() throws Exception {
Configuration conf = new Configuration();
conf.set(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY, "0,0");
conf.set(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY + ".ha-nn-uri-0.nn1",

View File

@ -0,0 +1,219 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import java.net.URI;
import java.util.ArrayList;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferServer;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferTestCase;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.junit.Before;
import org.junit.Test;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SEND_QOP_ENABLED;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_NEW_QOP_KEY;
import static org.junit.Assert.*;
/**
* This test tests access NameNode on different port with different
* configured QOP.
*/
public class TestMultipleNNPortQOP extends SaslDataTransferTestCase {
private static final Path PATH1 = new Path("/file1");
private static final Path PATH2 = new Path("/file2");
private static final Path PATH3 = new Path("/file3");
private static final int BLOCK_SIZE = 4096;
private static final int NUM_BLOCKS = 3;
private static HdfsConfiguration clusterConf;
@Before
public void setup() throws Exception {
clusterConf = createSecureConfig(
"authentication,integrity,privacy");
clusterConf.set(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY,
"12000,12100,12200");
// explicitly setting service rpc for datanode. This because
// DFSUtil.getNNServiceRpcAddressesForCluster looks up client facing port
// and service port at the same time, and if no setting for service
// rpc, it would return client port, in this case, it will be the
// auxiliary port for data node. Which is not what auxiliary is for.
// setting service rpc port to avoid this.
clusterConf.set(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, "localhost:9020");
clusterConf.set(
CommonConfigurationKeys.HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS,
"org.apache.hadoop.security.IngressPortBasedResolver");
clusterConf.set("ingress.port.sasl.configured.ports", "12000,12100,12200");
clusterConf.set("ingress.port.sasl.prop.12000", "authentication");
clusterConf.set("ingress.port.sasl.prop.12100", "integrity");
clusterConf.set("ingress.port.sasl.prop.12200", "privacy");
clusterConf.setBoolean(DFS_NAMENODE_SEND_QOP_ENABLED, true);
}
/**
* Test accessing NameNode from three different ports.
*
* @throws Exception
*/
@Test
public void testMultipleNNPort() throws Exception {
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(clusterConf)
.numDataNodes(3).build();
cluster.waitActive();
HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf);
clientConf.unset(
CommonConfigurationKeys.HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS);
ArrayList<DataNode> dataNodes = cluster.getDataNodes();
URI currentURI = cluster.getURI();
URI uriAuthPort = new URI(currentURI.getScheme() +
"://" + currentURI.getHost() + ":12000");
URI uriIntegrityPort = new URI(currentURI.getScheme() +
"://" + currentURI.getHost() + ":12100");
URI uriPrivacyPort = new URI(currentURI.getScheme() +
"://" + currentURI.getHost() + ":12200");
clientConf.set(HADOOP_RPC_PROTECTION, "privacy");
FileSystem fsPrivacy = FileSystem.get(uriPrivacyPort, clientConf);
doTest(fsPrivacy, PATH1);
for (DataNode dn : dataNodes) {
SaslDataTransferServer saslServer = dn.getSaslServer();
assertEquals("auth-conf", saslServer.getNegotiatedQOP());
}
clientConf.set(HADOOP_RPC_PROTECTION, "integrity");
FileSystem fsIntegrity = FileSystem.get(uriIntegrityPort, clientConf);
doTest(fsIntegrity, PATH2);
for (DataNode dn : dataNodes) {
SaslDataTransferServer saslServer = dn.getSaslServer();
assertEquals("auth-int", saslServer.getNegotiatedQOP());
}
clientConf.set(HADOOP_RPC_PROTECTION, "authentication");
FileSystem fsAuth = FileSystem.get(uriAuthPort, clientConf);
doTest(fsAuth, PATH3);
for (DataNode dn : dataNodes) {
SaslDataTransferServer saslServer = dn.getSaslServer();
assertEquals("auth", saslServer.getNegotiatedQOP());
}
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
/**
* Test accessing NameNode from three different ports, tests
* overwriting downstream DN in the pipeline.
*
* @throws Exception
*/
@Test
public void testMultipleNNPortOverwriteDownStream() throws Exception {
clusterConf.set(DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_NEW_QOP_KEY, "auth");
clusterConf.setBoolean(
DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_KEY, true);
MiniDFSCluster cluster = null;
try {
cluster =
new MiniDFSCluster.Builder(clusterConf).numDataNodes(3).build();
cluster.waitActive();
HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf);
clientConf.unset(
CommonConfigurationKeys.HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS);
ArrayList<DataNode> dataNodes = cluster.getDataNodes();
URI currentURI = cluster.getURI();
URI uriAuthPort =
new URI(currentURI.getScheme() + "://" +
currentURI.getHost() + ":12000");
URI uriIntegrityPort =
new URI(currentURI.getScheme() + "://" +
currentURI.getHost() + ":12100");
URI uriPrivacyPort =
new URI(currentURI.getScheme() + "://" +
currentURI.getHost() + ":12200");
clientConf.set(HADOOP_RPC_PROTECTION, "privacy");
FileSystem fsPrivacy = FileSystem.get(uriPrivacyPort, clientConf);
doTest(fsPrivacy, PATH1);
// add a wait so that data has reached not only first DN,
// but also the rest
Thread.sleep(100);
for (int i = 0; i < 2; i++) {
DataNode dn = dataNodes.get(i);
SaslDataTransferClient saslClient = dn.getSaslClient();
assertEquals("auth", saslClient.getTargetQOP());
}
clientConf.set(HADOOP_RPC_PROTECTION, "integrity");
FileSystem fsIntegrity = FileSystem.get(uriIntegrityPort, clientConf);
doTest(fsIntegrity, PATH2);
Thread.sleep(100);
for (int i = 0; i < 2; i++) {
DataNode dn = dataNodes.get(i);
SaslDataTransferClient saslClient = dn.getSaslClient();
assertEquals("auth", saslClient.getTargetQOP());
}
clientConf.set(HADOOP_RPC_PROTECTION, "authentication");
FileSystem fsAuth = FileSystem.get(uriAuthPort, clientConf);
doTest(fsAuth, PATH3);
Thread.sleep(100);
for (int i = 0; i < 3; i++) {
DataNode dn = dataNodes.get(i);
SaslDataTransferServer saslServer = dn.getSaslServer();
assertEquals("auth", saslServer.getNegotiatedQOP());
}
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
private void doTest(FileSystem fs, Path path) throws Exception {
FileSystemTestHelper.createFile(fs, path, NUM_BLOCKS, BLOCK_SIZE);
assertArrayEquals(FileSystemTestHelper.getFileData(NUM_BLOCKS, BLOCK_SIZE),
DFSTestUtil.readFile(fs, path).getBytes("UTF-8"));
BlockLocation[] blockLocations = fs.getFileBlockLocations(path, 0,
Long.MAX_VALUE);
assertNotNull(blockLocations);
assertEquals(NUM_BLOCKS, blockLocations.length);
for (BlockLocation blockLocation: blockLocations) {
assertNotNull(blockLocation.getHosts());
assertEquals(3, blockLocation.getHosts().length);
}
}
}

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hdfs.server.datanode;
import com.google.protobuf.ByteString;
import javax.crypto.SecretKey;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.net.*;
@ -87,7 +88,7 @@ public class TestDataXceiverBackwardsCompat {
doReturn(pair).when(saslClient).socketSend(
any(Socket.class), any(OutputStream.class), any(InputStream.class),
any(DataEncryptionKeyFactory.class), any(Token.class),
any(DatanodeID.class));
any(DatanodeID.class), any(SecretKey.class));
doReturn(mock(ReplicaHandler.class)).when(data).createTemporary(
any(StorageType.class), any(String.class), any(ExtendedBlock.class),
anyBoolean());