HDFS-6606. Optimize HDFS Encrypted Transport performance. (yliu)

This commit is contained in:
yliu 2014-10-28 21:11:31 +08:00
parent c9bec46c92
commit 58c0bb9ed9
18 changed files with 671 additions and 42 deletions

View File

@ -0,0 +1,66 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.crypto;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* Used between client and server to negotiate the
* cipher suite, key and iv.
*/
@InterfaceAudience.Private
public class CipherOption {
private final CipherSuite suite;
private final byte[] inKey;
private final byte[] inIv;
private final byte[] outKey;
private final byte[] outIv;
public CipherOption(CipherSuite suite) {
this(suite, null, null, null, null);
}
public CipherOption(CipherSuite suite, byte[] inKey, byte[] inIv,
byte[] outKey, byte[] outIv) {
this.suite = suite;
this.inKey = inKey;
this.inIv = inIv;
this.outKey = outKey;
this.outIv = outIv;
}
public CipherSuite getCipherSuite() {
return suite;
}
public byte[] getInKey() {
return inKey;
}
public byte[] getInIv() {
return inIv;
}
public byte[] getOutKey() {
return outKey;
}
public byte[] getOutIv() {
return outIv;
}
}

View File

@ -23,6 +23,7 @@ import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.security.GeneralSecurityException;
import java.util.EnumSet;
import java.util.Queue;
@ -57,7 +58,8 @@ import com.google.common.base.Preconditions;
@InterfaceStability.Evolving
public class CryptoInputStream extends FilterInputStream implements
Seekable, PositionedReadable, ByteBufferReadable, HasFileDescriptor,
CanSetDropBehind, CanSetReadahead, HasEnhancedByteBufferAccess {
CanSetDropBehind, CanSetReadahead, HasEnhancedByteBufferAccess,
ReadableByteChannel {
private static final byte[] oneByteBuf = new byte[1];
private final CryptoCodec codec;
private final Decryptor decryptor;
@ -92,6 +94,8 @@ public class CryptoInputStream extends FilterInputStream implements
private final byte[] key;
private final byte[] initIV;
private byte[] iv;
private final boolean isByteBufferReadable;
private final boolean isReadableByteChannel;
/** DirectBuffer pool */
private final Queue<ByteBuffer> bufferPool =
@ -115,6 +119,8 @@ public class CryptoInputStream extends FilterInputStream implements
this.initIV = iv.clone();
this.iv = iv.clone();
this.streamOffset = streamOffset;
isByteBufferReadable = in instanceof ByteBufferReadable;
isReadableByteChannel = in instanceof ReadableByteChannel;
inBuffer = ByteBuffer.allocateDirect(this.bufferSize);
outBuffer = ByteBuffer.allocateDirect(this.bufferSize);
decryptor = getDecryptor();
@ -165,9 +171,11 @@ public class CryptoInputStream extends FilterInputStream implements
* it can avoid bytes copy.
*/
if (usingByteBufferRead == null) {
if (in instanceof ByteBufferReadable) {
if (isByteBufferReadable || isReadableByteChannel) {
try {
n = ((ByteBufferReadable) in).read(inBuffer);
n = isByteBufferReadable ?
((ByteBufferReadable) in).read(inBuffer) :
((ReadableByteChannel) in).read(inBuffer);
usingByteBufferRead = Boolean.TRUE;
} catch (UnsupportedOperationException e) {
usingByteBufferRead = Boolean.FALSE;
@ -180,7 +188,8 @@ public class CryptoInputStream extends FilterInputStream implements
}
} else {
if (usingByteBufferRead) {
n = ((ByteBufferReadable) in).read(inBuffer);
n = isByteBufferReadable ? ((ByteBufferReadable) in).read(inBuffer) :
((ReadableByteChannel) in).read(inBuffer);
} else {
n = readFromUnderlyingStream(inBuffer);
}
@ -450,7 +459,7 @@ public class CryptoInputStream extends FilterInputStream implements
@Override
public int read(ByteBuffer buf) throws IOException {
checkStream();
if (in instanceof ByteBufferReadable) {
if (isByteBufferReadable || isReadableByteChannel) {
final int unread = outBuffer.remaining();
if (unread > 0) { // Have unread decrypted data in buffer.
int toRead = buf.remaining();
@ -466,7 +475,8 @@ public class CryptoInputStream extends FilterInputStream implements
}
final int pos = buf.position();
final int n = ((ByteBufferReadable) in).read(buf);
final int n = isByteBufferReadable ? ((ByteBufferReadable) in).read(buf) :
((ReadableByteChannel) in).read(buf);
if (n > 0) {
streamOffset += n; // Read n bytes
decrypt(buf, n, pos);
@ -481,10 +491,22 @@ public class CryptoInputStream extends FilterInputStream implements
return unread;
}
}
} else {
int n = 0;
if (buf.hasArray()) {
n = read(buf.array(), buf.position(), buf.remaining());
if (n > 0) {
buf.position(buf.position() + n);
}
} else {
byte[] tmp = new byte[buf.remaining()];
n = read(tmp);
if (n > 0) {
buf.put(tmp, 0, n);
}
}
return n;
}
throw new UnsupportedOperationException("ByteBuffer read unsupported " +
"by input stream.");
}
/**
@ -686,4 +708,9 @@ public class CryptoInputStream extends FilterInputStream implements
decryptorPool.add(decryptor);
}
}
@Override
public boolean isOpen() {
return !closed;
}
}

View File

@ -663,6 +663,8 @@ Release 2.6.0 - UNRELEASED
HDFS-7122. Use of ThreadLocal<Random> results in poor block placement.
(wang)
HDFS-6606. Optimize HDFS Encrypted Transport performance. (yliu)
BUG FIXES
HDFS-6823. dfs.web.authentication.kerberos.principal shows up in logs for

View File

@ -686,7 +686,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
this.initThreadsNumForHedgedReads(numThreads);
}
this.saslClient = new SaslDataTransferClient(
DataTransferSaslUtil.getSaslPropertiesResolver(conf),
conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf),
TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth);
}

View File

@ -600,6 +600,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
// Security-related configs
public static final String DFS_ENCRYPT_DATA_TRANSFER_KEY = "dfs.encrypt.data.transfer";
public static final boolean DFS_ENCRYPT_DATA_TRANSFER_DEFAULT = false;
public static final String DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_KEY = "dfs.encrypt.data.transfer.cipher.key.bitlength";
public static final int DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_DEFAULT = 128;
public static final String DFS_DATA_ENCRYPTION_ALGORITHM_KEY = "dfs.encrypt.data.transfer.algorithm";
public static final String DFS_TRUSTEDCHANNEL_RESOLVER_CLASS = "dfs.trustedchannel.resolver.class";
public static final String DFS_DATA_TRANSFER_PROTECTION_KEY = "dfs.data.transfer.protection";

View File

@ -21,6 +21,8 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_RPC_PROT
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_DEFAULT;
import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
import java.io.IOException;
@ -28,6 +30,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.security.sasl.Sasl;
@ -35,10 +38,18 @@ import javax.security.sasl.Sasl;
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.CipherOption;
import org.apache.hadoop.crypto.CipherSuite;
import org.apache.hadoop.crypto.CryptoCodec;
import org.apache.hadoop.crypto.CryptoInputStream;
import org.apache.hadoop.crypto.CryptoOutputStream;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CipherOptionProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.security.SaslPropertiesResolver;
import org.apache.hadoop.security.SaslRpcServer.QualityOfProtection;
import org.slf4j.Logger;
@ -95,6 +106,19 @@ public final class DataTransferSaslUtil {
"requested = %s, negotiated = %s", requestedQop, negotiatedQop));
}
}
/**
* Check whether requested SASL Qop contains privacy.
*
* @param saslProps properties of SASL negotiation
* @return boolean true if privacy exists
*/
public static boolean requestedQopContainsPrivacy(
Map<String, String> saslProps) {
Set<String> requestedQop = ImmutableSet.copyOf(Arrays.asList(
saslProps.get(Sasl.QOP).split(",")));
return requestedQop.contains("auth-conf");
}
/**
* Creates SASL properties required for an encrypted SASL negotiation.
@ -176,20 +200,6 @@ public final class DataTransferSaslUtil {
return resolver;
}
/**
* Performs the first step of SASL negotiation.
*
* @param out connection output stream
* @param in connection input stream
* @param sasl participant
*/
public static void performSaslStep1(OutputStream out, InputStream in,
SaslParticipant sasl) throws IOException {
byte[] remoteResponse = readSaslMessage(in);
byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
sendSaslMessage(out, localResponse);
}
/**
* Reads a SASL negotiation message.
*
@ -208,6 +218,124 @@ public final class DataTransferSaslUtil {
return proto.getPayload().toByteArray();
}
}
/**
* Reads a SASL negotiation message and negotiation cipher options.
*
* @param in stream to read
* @param cipherOptions list to store negotiation cipher options
* @return byte[] SASL negotiation message
* @throws IOException for any error
*/
public static byte[] readSaslMessageAndNegotiationCipherOptions(
InputStream in, List<CipherOption> cipherOptions) throws IOException {
DataTransferEncryptorMessageProto proto =
DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in));
if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) {
throw new InvalidEncryptionKeyException(proto.getMessage());
} else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) {
throw new IOException(proto.getMessage());
} else {
List<CipherOptionProto> optionProtos = proto.getCipherOptionList();
if (optionProtos != null) {
for (CipherOptionProto optionProto : optionProtos) {
cipherOptions.add(PBHelper.convert(optionProto));
}
}
return proto.getPayload().toByteArray();
}
}
/**
* Negotiate a cipher option which server supports.
*
* @param options the cipher options which client supports
* @return CipherOption negotiated cipher option
*/
public static CipherOption negotiateCipherOption(Configuration conf,
List<CipherOption> options) {
if (options != null) {
for (CipherOption option : options) {
// Currently we support AES/CTR/NoPadding
CipherSuite suite = option.getCipherSuite();
if (suite == CipherSuite.AES_CTR_NOPADDING) {
int keyLen = conf.getInt(
DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_KEY,
DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_DEFAULT) / 8;
CryptoCodec codec = CryptoCodec.getInstance(conf, suite);
byte[] inKey = new byte[keyLen];
byte[] inIv = new byte[suite.getAlgorithmBlockSize()];
byte[] outKey = new byte[keyLen];
byte[] outIv = new byte[suite.getAlgorithmBlockSize()];
codec.generateSecureRandom(inKey);
codec.generateSecureRandom(inIv);
codec.generateSecureRandom(outKey);
codec.generateSecureRandom(outIv);
return new CipherOption(suite, inKey, inIv, outKey, outIv);
}
}
}
return null;
}
/**
* Send SASL message and negotiated cipher option to client.
*
* @param out stream to receive message
* @param payload to send
* @param option negotiated cipher option
* @throws IOException for any error
*/
public static void sendSaslMessageAndNegotiatedCipherOption(
OutputStream out, byte[] payload, CipherOption option)
throws IOException {
DataTransferEncryptorMessageProto.Builder builder =
DataTransferEncryptorMessageProto.newBuilder();
builder.setStatus(DataTransferEncryptorStatus.SUCCESS);
if (payload != null) {
builder.setPayload(ByteString.copyFrom(payload));
}
if (option != null) {
builder.addCipherOption(PBHelper.convert(option));
}
DataTransferEncryptorMessageProto proto = builder.build();
proto.writeDelimitedTo(out);
out.flush();
}
/**
* Create IOStreamPair of {@link org.apache.hadoop.crypto.CryptoInputStream}
* and {@link org.apache.hadoop.crypto.CryptoOutputStream}
*
* @param conf the configuration
* @param cipherOption negotiated cipher option
* @param out underlying output stream
* @param in underlying input stream
* @param isServer is server side
* @return IOStreamPair the stream pair
* @throws IOException for any error
*/
public static IOStreamPair createStreamPair(Configuration conf,
CipherOption cipherOption, OutputStream out, InputStream in,
boolean isServer) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Creating IOStreamPair of CryptoInputStream and " +
"CryptoOutputStream.");
}
CryptoCodec codec = CryptoCodec.getInstance(conf,
cipherOption.getCipherSuite());
byte[] inKey = cipherOption.getInKey();
byte[] inIv = cipherOption.getInIv();
byte[] outKey = cipherOption.getOutKey();
byte[] outIv = cipherOption.getOutIv();
InputStream cIn = new CryptoInputStream(in, codec,
isServer ? inKey : outKey, isServer ? inIv : outIv);
OutputStream cOut = new CryptoOutputStream(out, codec,
isServer ? outKey : inKey, isServer ? outIv : inIv);
return new IOStreamPair(cIn, cOut);
}
/**
* Sends a SASL negotiation message indicating an error.
@ -232,6 +360,116 @@ public final class DataTransferSaslUtil {
throws IOException {
sendSaslMessage(out, DataTransferEncryptorStatus.SUCCESS, payload, null);
}
/**
* Send a SASL negotiation message and negotiation cipher options to server.
*
* @param out stream to receive message
* @param payload to send
* @param options cipher options to negotiate
* @throws IOException for any error
*/
public static void sendSaslMessageAndNegotiationCipherOptions(
OutputStream out, byte[] payload, List<CipherOption> options)
throws IOException {
DataTransferEncryptorMessageProto.Builder builder =
DataTransferEncryptorMessageProto.newBuilder();
builder.setStatus(DataTransferEncryptorStatus.SUCCESS);
if (payload != null) {
builder.setPayload(ByteString.copyFrom(payload));
}
if (options != null) {
builder.addAllCipherOption(PBHelper.convertCipherOptions(options));
}
DataTransferEncryptorMessageProto proto = builder.build();
proto.writeDelimitedTo(out);
out.flush();
}
/**
* Read SASL message and negotiated cipher option from server.
*
* @param in stream to read
* @return SaslResponseWithNegotiatedCipherOption SASL message and
* negotiated cipher option
* @throws IOException for any error
*/
public static SaslResponseWithNegotiatedCipherOption
readSaslMessageAndNegotiatedCipherOption(InputStream in)
throws IOException {
DataTransferEncryptorMessageProto proto =
DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in));
if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) {
throw new InvalidEncryptionKeyException(proto.getMessage());
} else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) {
throw new IOException(proto.getMessage());
} else {
byte[] response = proto.getPayload().toByteArray();
List<CipherOption> options = PBHelper.convertCipherOptionProtos(
proto.getCipherOptionList());
CipherOption option = null;
if (options != null && !options.isEmpty()) {
option = options.get(0);
}
return new SaslResponseWithNegotiatedCipherOption(response, option);
}
}
/**
* Encrypt the key and iv of the negotiated cipher option.
*
* @param option negotiated cipher option
* @param sasl SASL participant representing server
* @return CipherOption negotiated cipher option which contains the
* encrypted key and iv
* @throws IOException for any error
*/
public static CipherOption wrap(CipherOption option, SaslParticipant sasl)
throws IOException {
if (option != null) {
byte[] inKey = option.getInKey();
if (inKey != null) {
inKey = sasl.wrap(inKey, 0, inKey.length);
}
byte[] outKey = option.getOutKey();
if (outKey != null) {
outKey = sasl.wrap(outKey, 0, outKey.length);
}
return new CipherOption(option.getCipherSuite(), inKey, option.getInIv(),
outKey, option.getOutIv());
}
return null;
}
/**
* Decrypt the key and iv of the negotiated cipher option.
*
* @param option negotiated cipher option
* @param sasl SASL participant representing client
* @return CipherOption negotiated cipher option which contains the
* decrypted key and iv
* @throws IOException for any error
*/
public static CipherOption unwrap(CipherOption option, SaslParticipant sasl)
throws IOException {
if (option != null) {
byte[] inKey = option.getInKey();
if (inKey != null) {
inKey = sasl.unwrap(inKey, 0, inKey.length);
}
byte[] outKey = option.getOutKey();
if (outKey != null) {
outKey = sasl.unwrap(outKey, 0, outKey.length);
}
return new CipherOption(option.getCipherSuite(), inKey, option.getInIv(),
outKey, option.getOutIv());
}
return null;
}
/**
* Sends a SASL negotiation message.

View File

@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
import static org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil.*;
import java.io.DataInputStream;
@ -27,6 +26,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.Socket;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
@ -40,6 +40,9 @@ import javax.security.sasl.RealmChoiceCallback;
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.CipherOption;
import org.apache.hadoop.crypto.CipherSuite;
import org.apache.hadoop.hdfs.net.EncryptedPeer;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
@ -54,6 +57,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
/**
* Negotiates SASL for DataTransferProtocol on behalf of a client. There are
@ -72,6 +76,7 @@ public class SaslDataTransferClient {
private static final Logger LOG = LoggerFactory.getLogger(
SaslDataTransferClient.class);
private final Configuration conf;
private final AtomicBoolean fallbackToSimpleAuth;
private final SaslPropertiesResolver saslPropsResolver;
private final TrustedChannelResolver trustedChannelResolver;
@ -82,27 +87,32 @@ public class SaslDataTransferClient {
* simple auth. For intra-cluster connections between data nodes in the same
* cluster, we can assume that all run under the same security configuration.
*
* @param conf the configuration
* @param saslPropsResolver for determining properties of SASL negotiation
* @param trustedChannelResolver for identifying trusted connections that do
* not require SASL negotiation
*/
public SaslDataTransferClient(SaslPropertiesResolver saslPropsResolver,
public SaslDataTransferClient(Configuration conf,
SaslPropertiesResolver saslPropsResolver,
TrustedChannelResolver trustedChannelResolver) {
this(saslPropsResolver, trustedChannelResolver, null);
this(conf, saslPropsResolver, trustedChannelResolver, null);
}
/**
* Creates a new SaslDataTransferClient.
*
* @param conf the configuration
* @param saslPropsResolver for determining properties of SASL negotiation
* @param trustedChannelResolver for identifying trusted connections that do
* not require SASL negotiation
* @param fallbackToSimpleAuth checked on each attempt at general SASL
* handshake, if true forces use of simple auth
*/
public SaslDataTransferClient(SaslPropertiesResolver saslPropsResolver,
public SaslDataTransferClient(Configuration conf,
SaslPropertiesResolver saslPropsResolver,
TrustedChannelResolver trustedChannelResolver,
AtomicBoolean fallbackToSimpleAuth) {
this.conf = conf;
this.fallbackToSimpleAuth = fallbackToSimpleAuth;
this.saslPropsResolver = saslPropsResolver;
this.trustedChannelResolver = trustedChannelResolver;
@ -436,17 +446,38 @@ public class SaslDataTransferClient {
sendSaslMessage(out, new byte[0]);
// step 1
performSaslStep1(out, in, sasl);
// step 2 (client-side only)
byte[] remoteResponse = readSaslMessage(in);
byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
List<CipherOption> cipherOptions = null;
if (requestedQopContainsPrivacy(saslProps)) {
// Negotiation cipher options
CipherOption option = new CipherOption(CipherSuite.AES_CTR_NOPADDING);
cipherOptions = Lists.newArrayListWithCapacity(1);
cipherOptions.add(option);
}
sendSaslMessageAndNegotiationCipherOptions(out, localResponse,
cipherOptions);
// step 2 (client-side only)
SaslResponseWithNegotiatedCipherOption response =
readSaslMessageAndNegotiatedCipherOption(in);
localResponse = sasl.evaluateChallengeOrResponse(response.payload);
assert localResponse == null;
// SASL handshake is complete
checkSaslComplete(sasl, saslProps);
return sasl.createStreamPair(out, in);
CipherOption cipherOption = null;
if (sasl.isNegotiatedQopPrivacy()) {
// Unwrap the negotiated cipher option
cipherOption = unwrap(response.cipherOption, sasl);
}
// If negotiated cipher option is not null, we will use it to create
// stream pair.
return cipherOption != null ? createStreamPair(
conf, cipherOption, underlyingOut, underlyingIn, false) :
sasl.createStreamPair(out, in);
} catch (IOException ioe) {
sendGenericSaslErrorMessage(out, ioe.getMessage());
throw ioe;

View File

@ -26,6 +26,7 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
import java.util.Map;
import javax.security.auth.callback.Callback;
@ -39,6 +40,7 @@ import javax.security.sasl.SaslException;
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.crypto.CipherOption;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
@ -53,6 +55,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
/**
* Negotiates SASL for DataTransferProtocol on behalf of a server. There are
@ -351,17 +354,40 @@ public class SaslDataTransferServer {
}
try {
// step 1
performSaslStep1(out, in, sasl);
// step 2 (server-side only)
byte[] remoteResponse = readSaslMessage(in);
byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
sendSaslMessage(out, localResponse);
// step 2 (server-side only)
List<CipherOption> cipherOptions = Lists.newArrayList();
remoteResponse = readSaslMessageAndNegotiationCipherOptions(
in, cipherOptions);
localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
// SASL handshake is complete
checkSaslComplete(sasl, saslProps);
return sasl.createStreamPair(out, in);
CipherOption cipherOption = null;
if (sasl.isNegotiatedQopPrivacy()) {
// Negotiate a cipher option
cipherOption = negotiateCipherOption(dnConf.getConf(), cipherOptions);
if (cipherOption != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Server using cipher suite " +
cipherOption.getCipherSuite().getName());
}
}
}
// If negotiated cipher option is not null, wrap it before sending.
sendSaslMessageAndNegotiatedCipherOption(out, localResponse,
wrap(cipherOption, sasl));
// If negotiated cipher option is not null, we will use it to create
// stream pair.
return cipherOption != null ? createStreamPair(
dnConf.getConf(), cipherOption, underlyingOut, underlyingIn, true) :
sasl.createStreamPair(out, in);
} catch (IOException ioe) {
if (ioe instanceof SaslException &&
ioe.getCause() != null &&

View File

@ -129,6 +129,50 @@ class SaslParticipant {
return (String) saslServer.getNegotiatedProperty(Sasl.QOP);
}
}
/**
* After successful SASL negotiation, returns whether it's QOP privacy
*
* @return boolean whether it's QOP privacy
*/
public boolean isNegotiatedQopPrivacy() {
String qop = getNegotiatedQop();
return qop != null && "auth-conf".equalsIgnoreCase(qop);
}
/**
* Wraps a byte array.
*
* @param bytes The array containing the bytes to wrap.
* @param off The starting position at the array
* @param len The number of bytes to wrap
* @return byte[] wrapped bytes
* @throws SaslException if the bytes cannot be successfully wrapped
*/
public byte[] wrap(byte[] bytes, int off, int len) throws SaslException {
if (saslClient != null) {
return saslClient.wrap(bytes, off, len);
} else {
return saslServer.wrap(bytes, off, len);
}
}
/**
* Unwraps a byte array.
*
* @param bytes The array containing the bytes to unwrap.
* @param off The starting position at the array
* @param len The number of bytes to unwrap
* @return byte[] unwrapped bytes
* @throws SaslException if the bytes cannot be successfully unwrapped
*/
public byte[] unwrap(byte[] bytes, int off, int len) throws SaslException {
if (saslClient != null) {
return saslClient.unwrap(bytes, off, len);
} else {
return saslServer.unwrap(bytes, off, len);
}
}
/**
* Returns true if SASL negotiation is complete.

View File

@ -0,0 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.crypto.CipherOption;
@InterfaceAudience.Private
public class SaslResponseWithNegotiatedCipherOption {
final byte[] payload;
final CipherOption cipherOption;
public SaslResponseWithNegotiatedCipherOption(byte[] payload,
CipherOption cipherOption) {
this.payload = payload;
this.cipherOption = cipherOption;
}
}

View File

@ -58,6 +58,7 @@ import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats;
import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolStats;
import org.apache.hadoop.crypto.CipherOption;
import org.apache.hadoop.crypto.CipherSuite;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
@ -128,6 +129,7 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockWithLocationsProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlocksWithLocationsProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointSignatureProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CipherOptionProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ContentSummaryProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CorruptFileBlocksProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DataEncryptionKeyProto;
@ -2689,6 +2691,83 @@ public class PBHelper {
return GetEditsFromTxidResponseProto.newBuilder().setEventsList(
builder.build()).build();
}
public static CipherOptionProto convert(CipherOption option) {
if (option != null) {
CipherOptionProto.Builder builder = CipherOptionProto.
newBuilder();
if (option.getCipherSuite() != null) {
builder.setSuite(convert(option.getCipherSuite()));
}
if (option.getInKey() != null) {
builder.setInKey(ByteString.copyFrom(option.getInKey()));
}
if (option.getInIv() != null) {
builder.setInIv(ByteString.copyFrom(option.getInIv()));
}
if (option.getOutKey() != null) {
builder.setOutKey(ByteString.copyFrom(option.getOutKey()));
}
if (option.getOutIv() != null) {
builder.setOutIv(ByteString.copyFrom(option.getOutIv()));
}
return builder.build();
}
return null;
}
public static CipherOption convert(CipherOptionProto proto) {
if (proto != null) {
CipherSuite suite = null;
if (proto.getSuite() != null) {
suite = convert(proto.getSuite());
}
byte[] inKey = null;
if (proto.getInKey() != null) {
inKey = proto.getInKey().toByteArray();
}
byte[] inIv = null;
if (proto.getInIv() != null) {
inIv = proto.getInIv().toByteArray();
}
byte[] outKey = null;
if (proto.getOutKey() != null) {
outKey = proto.getOutKey().toByteArray();
}
byte[] outIv = null;
if (proto.getOutIv() != null) {
outIv = proto.getOutIv().toByteArray();
}
return new CipherOption(suite, inKey, inIv, outKey, outIv);
}
return null;
}
public static List<CipherOptionProto> convertCipherOptions(
List<CipherOption> options) {
if (options != null) {
List<CipherOptionProto> protos =
Lists.newArrayListWithCapacity(options.size());
for (CipherOption option : options) {
protos.add(convert(option));
}
return protos;
}
return null;
}
public static List<CipherOption> convertCipherOptionProtos(
List<CipherOptionProto> protos) {
if (protos != null) {
List<CipherOption> options =
Lists.newArrayListWithCapacity(protos.size());
for (CipherOptionProto proto : protos) {
options.add(convert(proto));
}
return options;
}
return null;
}
public static CipherSuiteProto convert(CipherSuite suite) {
switch (suite) {

View File

@ -785,7 +785,7 @@ public class Dispatcher {
: Executors.newFixedThreadPool(dispatcherThreads);
this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode;
this.saslClient = new SaslDataTransferClient(
this.saslClient = new SaslDataTransferClient(conf,
DataTransferSaslUtil.getSaslPropertiesResolver(conf),
TrustedChannelResolver.getInstance(conf), nnc.fallbackToSimpleAuth);
}

View File

@ -64,6 +64,7 @@ import org.apache.hadoop.security.SaslPropertiesResolver;
*/
@InterfaceAudience.Private
public class DNConf {
final Configuration conf;
final int socketTimeout;
final int socketWriteTimeout;
final int socketKeepaliveTimeout;
@ -100,6 +101,7 @@ public class DNConf {
final long maxLockedMemory;
public DNConf(Configuration conf) {
this.conf = conf;
socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
HdfsServerConstants.READ_TIMEOUT);
socketWriteTimeout = conf.getInt(DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY,
@ -197,6 +199,15 @@ public class DNConf {
String getMinimumNameNodeVersion() {
return this.minimumNameNodeVersion;
}
/**
* Returns the configuration.
*
* @return Configuration the configuration
*/
public Configuration getConf() {
return conf;
}
/**
* Returns true if encryption enabled for DataTransferProtocol.

View File

@ -1070,8 +1070,8 @@ public class DataNode extends ReconfigurableBase
// Create the ReadaheadPool from the DataNode context so we can
// exit without having to explicitly shutdown its thread pool.
readaheadPool = ReadaheadPool.getInstance();
saslClient = new SaslDataTransferClient(dnConf.saslPropsResolver,
dnConf.trustedChannelResolver);
saslClient = new SaslDataTransferClient(dnConf.conf,
dnConf.saslPropsResolver, dnConf.trustedChannelResolver);
saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager);
}

View File

@ -42,6 +42,7 @@ message DataTransferEncryptorMessageProto {
required DataTransferEncryptorStatus status = 1;
optional bytes payload = 2;
optional string message = 3;
repeated CipherOptionProto cipherOption = 4;
}
message BaseHeaderProto {

View File

@ -264,6 +264,17 @@ message ZoneEncryptionInfoProto {
required string keyName = 3;
}
/**
* Cipher option
*/
message CipherOptionProto {
required CipherSuiteProto suite = 1;
optional bytes inKey = 2;
optional bytes inIv = 3;
optional bytes outKey = 4;
optional bytes outIv = 5;
}
/**
* A set of file blocks and their locations.
*/

View File

@ -1514,6 +1514,18 @@
the configured JCE default on the system is used (usually 3DES.) It is
widely believed that 3DES is more cryptographically secure, but RC4 is
substantially faster.
Note that if AES is supported by both the client and server then this
encryption algorithm will only be used to initially transfer keys for AES.
</description>
</property>
<property>
<name>dfs.encrypt.data.transfer.cipher.key.bitlength</name>
<value>128</value>
<description>
The key bitlength negotiated by dfsclient and datanode for encryption.
This value may be set to either 128, 192 or 256.
</description>
</property>

View File

@ -37,11 +37,15 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferServer;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@ -50,6 +54,10 @@ import org.mockito.Mockito;
@RunWith(Parameterized.class)
public class TestEncryptedTransfer {
{
LogManager.getLogger(SaslDataTransferServer.class).setLevel(Level.DEBUG);
LogManager.getLogger(DataTransferSaslUtil.class).setLevel(Level.DEBUG);
}
@Parameters
public static Collection<Object[]> data() {
@ -111,9 +119,28 @@ public class TestEncryptedTransfer {
.build();
fs = getFileSystem(conf);
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
LogFactory.getLog(SaslDataTransferServer.class));
LogCapturer logs1 = GenericTestUtils.LogCapturer.captureLogs(
LogFactory.getLog(DataTransferSaslUtil.class));
try {
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
} finally {
logs.stopCapturing();
logs1.stopCapturing();
}
fs.close();
if (resolverClazz != null && !resolverClazz.endsWith("TestTrustedChannelResolver")){
// Test client and server negotiate cipher option
GenericTestUtils.assertMatches(logs.getOutput(),
"Server using cipher suite");
// Check the IOStreamPair
GenericTestUtils.assertMatches(logs1.getOutput(),
"Creating IOStreamPair of CryptoInputStream and CryptoOutputStream.");
}
} finally {
if (cluster != null) {
cluster.shutdown();
@ -403,9 +430,28 @@ public class TestEncryptedTransfer {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDns).build();
FileSystem fs = getFileSystem(conf);
writeTestDataToFile(fs);
LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
LogFactory.getLog(SaslDataTransferServer.class));
LogCapturer logs1 = GenericTestUtils.LogCapturer.captureLogs(
LogFactory.getLog(DataTransferSaslUtil.class));
try {
writeTestDataToFile(fs);
} finally {
logs.stopCapturing();
logs1.stopCapturing();
}
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
fs.close();
if (resolverClazz != null && !resolverClazz.endsWith("TestTrustedChannelResolver")){
// Test client and server negotiate cipher option
GenericTestUtils.assertMatches(logs.getOutput(),
"Server using cipher suite");
// Check the IOStreamPair
GenericTestUtils.assertMatches(logs1.getOutput(),
"Creating IOStreamPair of CryptoInputStream and CryptoOutputStream.");
}
} finally {
if (cluster != null) {
cluster.shutdown();