HDFS-6606. Optimize HDFS Encrypted Transport performance. (yliu)
This commit is contained in:
parent
9548bb360b
commit
8b812a35b8
|
@ -0,0 +1,66 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.crypto;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Used between client and server to negotiate the
|
||||||
|
* cipher suite, key and iv.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class CipherOption {
|
||||||
|
private final CipherSuite suite;
|
||||||
|
private final byte[] inKey;
|
||||||
|
private final byte[] inIv;
|
||||||
|
private final byte[] outKey;
|
||||||
|
private final byte[] outIv;
|
||||||
|
|
||||||
|
public CipherOption(CipherSuite suite) {
|
||||||
|
this(suite, null, null, null, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public CipherOption(CipherSuite suite, byte[] inKey, byte[] inIv,
|
||||||
|
byte[] outKey, byte[] outIv) {
|
||||||
|
this.suite = suite;
|
||||||
|
this.inKey = inKey;
|
||||||
|
this.inIv = inIv;
|
||||||
|
this.outKey = outKey;
|
||||||
|
this.outIv = outIv;
|
||||||
|
}
|
||||||
|
|
||||||
|
public CipherSuite getCipherSuite() {
|
||||||
|
return suite;
|
||||||
|
}
|
||||||
|
|
||||||
|
public byte[] getInKey() {
|
||||||
|
return inKey;
|
||||||
|
}
|
||||||
|
|
||||||
|
public byte[] getInIv() {
|
||||||
|
return inIv;
|
||||||
|
}
|
||||||
|
|
||||||
|
public byte[] getOutKey() {
|
||||||
|
return outKey;
|
||||||
|
}
|
||||||
|
|
||||||
|
public byte[] getOutIv() {
|
||||||
|
return outIv;
|
||||||
|
}
|
||||||
|
}
|
|
@ -23,6 +23,7 @@ import java.io.FilterInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
import java.nio.channels.ReadableByteChannel;
|
||||||
import java.security.GeneralSecurityException;
|
import java.security.GeneralSecurityException;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.Queue;
|
import java.util.Queue;
|
||||||
|
@ -57,7 +58,8 @@ import com.google.common.base.Preconditions;
|
||||||
@InterfaceStability.Evolving
|
@InterfaceStability.Evolving
|
||||||
public class CryptoInputStream extends FilterInputStream implements
|
public class CryptoInputStream extends FilterInputStream implements
|
||||||
Seekable, PositionedReadable, ByteBufferReadable, HasFileDescriptor,
|
Seekable, PositionedReadable, ByteBufferReadable, HasFileDescriptor,
|
||||||
CanSetDropBehind, CanSetReadahead, HasEnhancedByteBufferAccess {
|
CanSetDropBehind, CanSetReadahead, HasEnhancedByteBufferAccess,
|
||||||
|
ReadableByteChannel {
|
||||||
private static final byte[] oneByteBuf = new byte[1];
|
private static final byte[] oneByteBuf = new byte[1];
|
||||||
private final CryptoCodec codec;
|
private final CryptoCodec codec;
|
||||||
private final Decryptor decryptor;
|
private final Decryptor decryptor;
|
||||||
|
@ -92,6 +94,8 @@ public class CryptoInputStream extends FilterInputStream implements
|
||||||
private final byte[] key;
|
private final byte[] key;
|
||||||
private final byte[] initIV;
|
private final byte[] initIV;
|
||||||
private byte[] iv;
|
private byte[] iv;
|
||||||
|
private final boolean isByteBufferReadable;
|
||||||
|
private final boolean isReadableByteChannel;
|
||||||
|
|
||||||
/** DirectBuffer pool */
|
/** DirectBuffer pool */
|
||||||
private final Queue<ByteBuffer> bufferPool =
|
private final Queue<ByteBuffer> bufferPool =
|
||||||
|
@ -115,6 +119,8 @@ public class CryptoInputStream extends FilterInputStream implements
|
||||||
this.initIV = iv.clone();
|
this.initIV = iv.clone();
|
||||||
this.iv = iv.clone();
|
this.iv = iv.clone();
|
||||||
this.streamOffset = streamOffset;
|
this.streamOffset = streamOffset;
|
||||||
|
isByteBufferReadable = in instanceof ByteBufferReadable;
|
||||||
|
isReadableByteChannel = in instanceof ReadableByteChannel;
|
||||||
inBuffer = ByteBuffer.allocateDirect(this.bufferSize);
|
inBuffer = ByteBuffer.allocateDirect(this.bufferSize);
|
||||||
outBuffer = ByteBuffer.allocateDirect(this.bufferSize);
|
outBuffer = ByteBuffer.allocateDirect(this.bufferSize);
|
||||||
decryptor = getDecryptor();
|
decryptor = getDecryptor();
|
||||||
|
@ -165,9 +171,11 @@ public class CryptoInputStream extends FilterInputStream implements
|
||||||
* it can avoid bytes copy.
|
* it can avoid bytes copy.
|
||||||
*/
|
*/
|
||||||
if (usingByteBufferRead == null) {
|
if (usingByteBufferRead == null) {
|
||||||
if (in instanceof ByteBufferReadable) {
|
if (isByteBufferReadable || isReadableByteChannel) {
|
||||||
try {
|
try {
|
||||||
n = ((ByteBufferReadable) in).read(inBuffer);
|
n = isByteBufferReadable ?
|
||||||
|
((ByteBufferReadable) in).read(inBuffer) :
|
||||||
|
((ReadableByteChannel) in).read(inBuffer);
|
||||||
usingByteBufferRead = Boolean.TRUE;
|
usingByteBufferRead = Boolean.TRUE;
|
||||||
} catch (UnsupportedOperationException e) {
|
} catch (UnsupportedOperationException e) {
|
||||||
usingByteBufferRead = Boolean.FALSE;
|
usingByteBufferRead = Boolean.FALSE;
|
||||||
|
@ -180,7 +188,8 @@ public class CryptoInputStream extends FilterInputStream implements
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (usingByteBufferRead) {
|
if (usingByteBufferRead) {
|
||||||
n = ((ByteBufferReadable) in).read(inBuffer);
|
n = isByteBufferReadable ? ((ByteBufferReadable) in).read(inBuffer) :
|
||||||
|
((ReadableByteChannel) in).read(inBuffer);
|
||||||
} else {
|
} else {
|
||||||
n = readFromUnderlyingStream(inBuffer);
|
n = readFromUnderlyingStream(inBuffer);
|
||||||
}
|
}
|
||||||
|
@ -450,7 +459,7 @@ public class CryptoInputStream extends FilterInputStream implements
|
||||||
@Override
|
@Override
|
||||||
public int read(ByteBuffer buf) throws IOException {
|
public int read(ByteBuffer buf) throws IOException {
|
||||||
checkStream();
|
checkStream();
|
||||||
if (in instanceof ByteBufferReadable) {
|
if (isByteBufferReadable || isReadableByteChannel) {
|
||||||
final int unread = outBuffer.remaining();
|
final int unread = outBuffer.remaining();
|
||||||
if (unread > 0) { // Have unread decrypted data in buffer.
|
if (unread > 0) { // Have unread decrypted data in buffer.
|
||||||
int toRead = buf.remaining();
|
int toRead = buf.remaining();
|
||||||
|
@ -466,7 +475,8 @@ public class CryptoInputStream extends FilterInputStream implements
|
||||||
}
|
}
|
||||||
|
|
||||||
final int pos = buf.position();
|
final int pos = buf.position();
|
||||||
final int n = ((ByteBufferReadable) in).read(buf);
|
final int n = isByteBufferReadable ? ((ByteBufferReadable) in).read(buf) :
|
||||||
|
((ReadableByteChannel) in).read(buf);
|
||||||
if (n > 0) {
|
if (n > 0) {
|
||||||
streamOffset += n; // Read n bytes
|
streamOffset += n; // Read n bytes
|
||||||
decrypt(buf, n, pos);
|
decrypt(buf, n, pos);
|
||||||
|
@ -481,10 +491,22 @@ public class CryptoInputStream extends FilterInputStream implements
|
||||||
return unread;
|
return unread;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
int n = 0;
|
||||||
|
if (buf.hasArray()) {
|
||||||
|
n = read(buf.array(), buf.position(), buf.remaining());
|
||||||
|
if (n > 0) {
|
||||||
|
buf.position(buf.position() + n);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
byte[] tmp = new byte[buf.remaining()];
|
||||||
|
n = read(tmp);
|
||||||
|
if (n > 0) {
|
||||||
|
buf.put(tmp, 0, n);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return n;
|
||||||
}
|
}
|
||||||
|
|
||||||
throw new UnsupportedOperationException("ByteBuffer read unsupported " +
|
|
||||||
"by input stream.");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -686,4 +708,9 @@ public class CryptoInputStream extends FilterInputStream implements
|
||||||
decryptorPool.add(decryptor);
|
decryptorPool.add(decryptor);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isOpen() {
|
||||||
|
return !closed;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -407,6 +407,8 @@ Release 2.6.0 - UNRELEASED
|
||||||
HDFS-7122. Use of ThreadLocal<Random> results in poor block placement.
|
HDFS-7122. Use of ThreadLocal<Random> results in poor block placement.
|
||||||
(wang)
|
(wang)
|
||||||
|
|
||||||
|
HDFS-6606. Optimize HDFS Encrypted Transport performance. (yliu)
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
||||||
HDFS-6823. dfs.web.authentication.kerberos.principal shows up in logs for
|
HDFS-6823. dfs.web.authentication.kerberos.principal shows up in logs for
|
||||||
|
|
|
@ -693,7 +693,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
||||||
this.initThreadsNumForHedgedReads(numThreads);
|
this.initThreadsNumForHedgedReads(numThreads);
|
||||||
}
|
}
|
||||||
this.saslClient = new SaslDataTransferClient(
|
this.saslClient = new SaslDataTransferClient(
|
||||||
DataTransferSaslUtil.getSaslPropertiesResolver(conf),
|
conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf),
|
||||||
TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth);
|
TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -606,6 +606,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||||
// Security-related configs
|
// Security-related configs
|
||||||
public static final String DFS_ENCRYPT_DATA_TRANSFER_KEY = "dfs.encrypt.data.transfer";
|
public static final String DFS_ENCRYPT_DATA_TRANSFER_KEY = "dfs.encrypt.data.transfer";
|
||||||
public static final boolean DFS_ENCRYPT_DATA_TRANSFER_DEFAULT = false;
|
public static final boolean DFS_ENCRYPT_DATA_TRANSFER_DEFAULT = false;
|
||||||
|
public static final String DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_KEY = "dfs.encrypt.data.transfer.cipher.key.bitlength";
|
||||||
|
public static final int DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_DEFAULT = 128;
|
||||||
public static final String DFS_DATA_ENCRYPTION_ALGORITHM_KEY = "dfs.encrypt.data.transfer.algorithm";
|
public static final String DFS_DATA_ENCRYPTION_ALGORITHM_KEY = "dfs.encrypt.data.transfer.algorithm";
|
||||||
public static final String DFS_TRUSTEDCHANNEL_RESOLVER_CLASS = "dfs.trustedchannel.resolver.class";
|
public static final String DFS_TRUSTEDCHANNEL_RESOLVER_CLASS = "dfs.trustedchannel.resolver.class";
|
||||||
public static final String DFS_DATA_TRANSFER_PROTECTION_KEY = "dfs.data.transfer.protection";
|
public static final String DFS_DATA_TRANSFER_PROTECTION_KEY = "dfs.data.transfer.protection";
|
||||||
|
|
|
@ -21,6 +21,8 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_RPC_PROT
|
||||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS;
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
|
import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -28,6 +30,7 @@ import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import javax.security.sasl.Sasl;
|
import javax.security.sasl.Sasl;
|
||||||
|
@ -35,10 +38,18 @@ import javax.security.sasl.Sasl;
|
||||||
import org.apache.commons.codec.binary.Base64;
|
import org.apache.commons.codec.binary.Base64;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.crypto.CipherOption;
|
||||||
|
import org.apache.hadoop.crypto.CipherSuite;
|
||||||
|
import org.apache.hadoop.crypto.CryptoCodec;
|
||||||
|
import org.apache.hadoop.crypto.CryptoInputStream;
|
||||||
|
import org.apache.hadoop.crypto.CryptoOutputStream;
|
||||||
import org.apache.hadoop.hdfs.net.Peer;
|
import org.apache.hadoop.hdfs.net.Peer;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
|
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto;
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CipherOptionProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
|
||||||
import org.apache.hadoop.security.SaslPropertiesResolver;
|
import org.apache.hadoop.security.SaslPropertiesResolver;
|
||||||
import org.apache.hadoop.security.SaslRpcServer.QualityOfProtection;
|
import org.apache.hadoop.security.SaslRpcServer.QualityOfProtection;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -96,6 +107,19 @@ public final class DataTransferSaslUtil {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check whether requested SASL Qop contains privacy.
|
||||||
|
*
|
||||||
|
* @param saslProps properties of SASL negotiation
|
||||||
|
* @return boolean true if privacy exists
|
||||||
|
*/
|
||||||
|
public static boolean requestedQopContainsPrivacy(
|
||||||
|
Map<String, String> saslProps) {
|
||||||
|
Set<String> requestedQop = ImmutableSet.copyOf(Arrays.asList(
|
||||||
|
saslProps.get(Sasl.QOP).split(",")));
|
||||||
|
return requestedQop.contains("auth-conf");
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates SASL properties required for an encrypted SASL negotiation.
|
* Creates SASL properties required for an encrypted SASL negotiation.
|
||||||
*
|
*
|
||||||
|
@ -176,20 +200,6 @@ public final class DataTransferSaslUtil {
|
||||||
return resolver;
|
return resolver;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Performs the first step of SASL negotiation.
|
|
||||||
*
|
|
||||||
* @param out connection output stream
|
|
||||||
* @param in connection input stream
|
|
||||||
* @param sasl participant
|
|
||||||
*/
|
|
||||||
public static void performSaslStep1(OutputStream out, InputStream in,
|
|
||||||
SaslParticipant sasl) throws IOException {
|
|
||||||
byte[] remoteResponse = readSaslMessage(in);
|
|
||||||
byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
|
|
||||||
sendSaslMessage(out, localResponse);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Reads a SASL negotiation message.
|
* Reads a SASL negotiation message.
|
||||||
*
|
*
|
||||||
|
@ -209,6 +219,124 @@ public final class DataTransferSaslUtil {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reads a SASL negotiation message and negotiation cipher options.
|
||||||
|
*
|
||||||
|
* @param in stream to read
|
||||||
|
* @param cipherOptions list to store negotiation cipher options
|
||||||
|
* @return byte[] SASL negotiation message
|
||||||
|
* @throws IOException for any error
|
||||||
|
*/
|
||||||
|
public static byte[] readSaslMessageAndNegotiationCipherOptions(
|
||||||
|
InputStream in, List<CipherOption> cipherOptions) throws IOException {
|
||||||
|
DataTransferEncryptorMessageProto proto =
|
||||||
|
DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in));
|
||||||
|
if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) {
|
||||||
|
throw new InvalidEncryptionKeyException(proto.getMessage());
|
||||||
|
} else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) {
|
||||||
|
throw new IOException(proto.getMessage());
|
||||||
|
} else {
|
||||||
|
List<CipherOptionProto> optionProtos = proto.getCipherOptionList();
|
||||||
|
if (optionProtos != null) {
|
||||||
|
for (CipherOptionProto optionProto : optionProtos) {
|
||||||
|
cipherOptions.add(PBHelper.convert(optionProto));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return proto.getPayload().toByteArray();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Negotiate a cipher option which server supports.
|
||||||
|
*
|
||||||
|
* @param options the cipher options which client supports
|
||||||
|
* @return CipherOption negotiated cipher option
|
||||||
|
*/
|
||||||
|
public static CipherOption negotiateCipherOption(Configuration conf,
|
||||||
|
List<CipherOption> options) {
|
||||||
|
if (options != null) {
|
||||||
|
for (CipherOption option : options) {
|
||||||
|
// Currently we support AES/CTR/NoPadding
|
||||||
|
CipherSuite suite = option.getCipherSuite();
|
||||||
|
if (suite == CipherSuite.AES_CTR_NOPADDING) {
|
||||||
|
int keyLen = conf.getInt(
|
||||||
|
DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_KEY,
|
||||||
|
DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_DEFAULT) / 8;
|
||||||
|
CryptoCodec codec = CryptoCodec.getInstance(conf, suite);
|
||||||
|
byte[] inKey = new byte[keyLen];
|
||||||
|
byte[] inIv = new byte[suite.getAlgorithmBlockSize()];
|
||||||
|
byte[] outKey = new byte[keyLen];
|
||||||
|
byte[] outIv = new byte[suite.getAlgorithmBlockSize()];
|
||||||
|
codec.generateSecureRandom(inKey);
|
||||||
|
codec.generateSecureRandom(inIv);
|
||||||
|
codec.generateSecureRandom(outKey);
|
||||||
|
codec.generateSecureRandom(outIv);
|
||||||
|
return new CipherOption(suite, inKey, inIv, outKey, outIv);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Send SASL message and negotiated cipher option to client.
|
||||||
|
*
|
||||||
|
* @param out stream to receive message
|
||||||
|
* @param payload to send
|
||||||
|
* @param option negotiated cipher option
|
||||||
|
* @throws IOException for any error
|
||||||
|
*/
|
||||||
|
public static void sendSaslMessageAndNegotiatedCipherOption(
|
||||||
|
OutputStream out, byte[] payload, CipherOption option)
|
||||||
|
throws IOException {
|
||||||
|
DataTransferEncryptorMessageProto.Builder builder =
|
||||||
|
DataTransferEncryptorMessageProto.newBuilder();
|
||||||
|
|
||||||
|
builder.setStatus(DataTransferEncryptorStatus.SUCCESS);
|
||||||
|
if (payload != null) {
|
||||||
|
builder.setPayload(ByteString.copyFrom(payload));
|
||||||
|
}
|
||||||
|
if (option != null) {
|
||||||
|
builder.addCipherOption(PBHelper.convert(option));
|
||||||
|
}
|
||||||
|
|
||||||
|
DataTransferEncryptorMessageProto proto = builder.build();
|
||||||
|
proto.writeDelimitedTo(out);
|
||||||
|
out.flush();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create IOStreamPair of {@link org.apache.hadoop.crypto.CryptoInputStream}
|
||||||
|
* and {@link org.apache.hadoop.crypto.CryptoOutputStream}
|
||||||
|
*
|
||||||
|
* @param conf the configuration
|
||||||
|
* @param cipherOption negotiated cipher option
|
||||||
|
* @param out underlying output stream
|
||||||
|
* @param in underlying input stream
|
||||||
|
* @param isServer is server side
|
||||||
|
* @return IOStreamPair the stream pair
|
||||||
|
* @throws IOException for any error
|
||||||
|
*/
|
||||||
|
public static IOStreamPair createStreamPair(Configuration conf,
|
||||||
|
CipherOption cipherOption, OutputStream out, InputStream in,
|
||||||
|
boolean isServer) throws IOException {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Creating IOStreamPair of CryptoInputStream and " +
|
||||||
|
"CryptoOutputStream.");
|
||||||
|
}
|
||||||
|
CryptoCodec codec = CryptoCodec.getInstance(conf,
|
||||||
|
cipherOption.getCipherSuite());
|
||||||
|
byte[] inKey = cipherOption.getInKey();
|
||||||
|
byte[] inIv = cipherOption.getInIv();
|
||||||
|
byte[] outKey = cipherOption.getOutKey();
|
||||||
|
byte[] outIv = cipherOption.getOutIv();
|
||||||
|
InputStream cIn = new CryptoInputStream(in, codec,
|
||||||
|
isServer ? inKey : outKey, isServer ? inIv : outIv);
|
||||||
|
OutputStream cOut = new CryptoOutputStream(out, codec,
|
||||||
|
isServer ? outKey : inKey, isServer ? outIv : inIv);
|
||||||
|
return new IOStreamPair(cIn, cOut);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sends a SASL negotiation message indicating an error.
|
* Sends a SASL negotiation message indicating an error.
|
||||||
*
|
*
|
||||||
|
@ -233,6 +361,116 @@ public final class DataTransferSaslUtil {
|
||||||
sendSaslMessage(out, DataTransferEncryptorStatus.SUCCESS, payload, null);
|
sendSaslMessage(out, DataTransferEncryptorStatus.SUCCESS, payload, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Send a SASL negotiation message and negotiation cipher options to server.
|
||||||
|
*
|
||||||
|
* @param out stream to receive message
|
||||||
|
* @param payload to send
|
||||||
|
* @param options cipher options to negotiate
|
||||||
|
* @throws IOException for any error
|
||||||
|
*/
|
||||||
|
public static void sendSaslMessageAndNegotiationCipherOptions(
|
||||||
|
OutputStream out, byte[] payload, List<CipherOption> options)
|
||||||
|
throws IOException {
|
||||||
|
DataTransferEncryptorMessageProto.Builder builder =
|
||||||
|
DataTransferEncryptorMessageProto.newBuilder();
|
||||||
|
|
||||||
|
builder.setStatus(DataTransferEncryptorStatus.SUCCESS);
|
||||||
|
if (payload != null) {
|
||||||
|
builder.setPayload(ByteString.copyFrom(payload));
|
||||||
|
}
|
||||||
|
if (options != null) {
|
||||||
|
builder.addAllCipherOption(PBHelper.convertCipherOptions(options));
|
||||||
|
}
|
||||||
|
|
||||||
|
DataTransferEncryptorMessageProto proto = builder.build();
|
||||||
|
proto.writeDelimitedTo(out);
|
||||||
|
out.flush();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Read SASL message and negotiated cipher option from server.
|
||||||
|
*
|
||||||
|
* @param in stream to read
|
||||||
|
* @return SaslResponseWithNegotiatedCipherOption SASL message and
|
||||||
|
* negotiated cipher option
|
||||||
|
* @throws IOException for any error
|
||||||
|
*/
|
||||||
|
public static SaslResponseWithNegotiatedCipherOption
|
||||||
|
readSaslMessageAndNegotiatedCipherOption(InputStream in)
|
||||||
|
throws IOException {
|
||||||
|
DataTransferEncryptorMessageProto proto =
|
||||||
|
DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in));
|
||||||
|
if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) {
|
||||||
|
throw new InvalidEncryptionKeyException(proto.getMessage());
|
||||||
|
} else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) {
|
||||||
|
throw new IOException(proto.getMessage());
|
||||||
|
} else {
|
||||||
|
byte[] response = proto.getPayload().toByteArray();
|
||||||
|
List<CipherOption> options = PBHelper.convertCipherOptionProtos(
|
||||||
|
proto.getCipherOptionList());
|
||||||
|
CipherOption option = null;
|
||||||
|
if (options != null && !options.isEmpty()) {
|
||||||
|
option = options.get(0);
|
||||||
|
}
|
||||||
|
return new SaslResponseWithNegotiatedCipherOption(response, option);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Encrypt the key and iv of the negotiated cipher option.
|
||||||
|
*
|
||||||
|
* @param option negotiated cipher option
|
||||||
|
* @param sasl SASL participant representing server
|
||||||
|
* @return CipherOption negotiated cipher option which contains the
|
||||||
|
* encrypted key and iv
|
||||||
|
* @throws IOException for any error
|
||||||
|
*/
|
||||||
|
public static CipherOption wrap(CipherOption option, SaslParticipant sasl)
|
||||||
|
throws IOException {
|
||||||
|
if (option != null) {
|
||||||
|
byte[] inKey = option.getInKey();
|
||||||
|
if (inKey != null) {
|
||||||
|
inKey = sasl.wrap(inKey, 0, inKey.length);
|
||||||
|
}
|
||||||
|
byte[] outKey = option.getOutKey();
|
||||||
|
if (outKey != null) {
|
||||||
|
outKey = sasl.wrap(outKey, 0, outKey.length);
|
||||||
|
}
|
||||||
|
return new CipherOption(option.getCipherSuite(), inKey, option.getInIv(),
|
||||||
|
outKey, option.getOutIv());
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Decrypt the key and iv of the negotiated cipher option.
|
||||||
|
*
|
||||||
|
* @param option negotiated cipher option
|
||||||
|
* @param sasl SASL participant representing client
|
||||||
|
* @return CipherOption negotiated cipher option which contains the
|
||||||
|
* decrypted key and iv
|
||||||
|
* @throws IOException for any error
|
||||||
|
*/
|
||||||
|
public static CipherOption unwrap(CipherOption option, SaslParticipant sasl)
|
||||||
|
throws IOException {
|
||||||
|
if (option != null) {
|
||||||
|
byte[] inKey = option.getInKey();
|
||||||
|
if (inKey != null) {
|
||||||
|
inKey = sasl.unwrap(inKey, 0, inKey.length);
|
||||||
|
}
|
||||||
|
byte[] outKey = option.getOutKey();
|
||||||
|
if (outKey != null) {
|
||||||
|
outKey = sasl.unwrap(outKey, 0, outKey.length);
|
||||||
|
}
|
||||||
|
return new CipherOption(option.getCipherSuite(), inKey, option.getInIv(),
|
||||||
|
outKey, option.getOutIv());
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sends a SASL negotiation message.
|
* Sends a SASL negotiation message.
|
||||||
*
|
*
|
||||||
|
|
|
@ -17,7 +17,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
|
package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
|
||||||
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
|
|
||||||
import static org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil.*;
|
import static org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil.*;
|
||||||
|
|
||||||
import java.io.DataInputStream;
|
import java.io.DataInputStream;
|
||||||
|
@ -27,6 +26,7 @@ import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
import java.net.Socket;
|
import java.net.Socket;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
@ -40,6 +40,9 @@ import javax.security.sasl.RealmChoiceCallback;
|
||||||
|
|
||||||
import org.apache.commons.codec.binary.Base64;
|
import org.apache.commons.codec.binary.Base64;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.crypto.CipherOption;
|
||||||
|
import org.apache.hadoop.crypto.CipherSuite;
|
||||||
import org.apache.hadoop.hdfs.net.EncryptedPeer;
|
import org.apache.hadoop.hdfs.net.EncryptedPeer;
|
||||||
import org.apache.hadoop.hdfs.net.Peer;
|
import org.apache.hadoop.hdfs.net.Peer;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
|
@ -54,6 +57,7 @@ import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import com.google.common.base.Charsets;
|
import com.google.common.base.Charsets;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Negotiates SASL for DataTransferProtocol on behalf of a client. There are
|
* Negotiates SASL for DataTransferProtocol on behalf of a client. There are
|
||||||
|
@ -72,6 +76,7 @@ public class SaslDataTransferClient {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(
|
private static final Logger LOG = LoggerFactory.getLogger(
|
||||||
SaslDataTransferClient.class);
|
SaslDataTransferClient.class);
|
||||||
|
|
||||||
|
private final Configuration conf;
|
||||||
private final AtomicBoolean fallbackToSimpleAuth;
|
private final AtomicBoolean fallbackToSimpleAuth;
|
||||||
private final SaslPropertiesResolver saslPropsResolver;
|
private final SaslPropertiesResolver saslPropsResolver;
|
||||||
private final TrustedChannelResolver trustedChannelResolver;
|
private final TrustedChannelResolver trustedChannelResolver;
|
||||||
|
@ -82,27 +87,32 @@ public class SaslDataTransferClient {
|
||||||
* simple auth. For intra-cluster connections between data nodes in the same
|
* simple auth. For intra-cluster connections between data nodes in the same
|
||||||
* cluster, we can assume that all run under the same security configuration.
|
* cluster, we can assume that all run under the same security configuration.
|
||||||
*
|
*
|
||||||
|
* @param conf the configuration
|
||||||
* @param saslPropsResolver for determining properties of SASL negotiation
|
* @param saslPropsResolver for determining properties of SASL negotiation
|
||||||
* @param trustedChannelResolver for identifying trusted connections that do
|
* @param trustedChannelResolver for identifying trusted connections that do
|
||||||
* not require SASL negotiation
|
* not require SASL negotiation
|
||||||
*/
|
*/
|
||||||
public SaslDataTransferClient(SaslPropertiesResolver saslPropsResolver,
|
public SaslDataTransferClient(Configuration conf,
|
||||||
|
SaslPropertiesResolver saslPropsResolver,
|
||||||
TrustedChannelResolver trustedChannelResolver) {
|
TrustedChannelResolver trustedChannelResolver) {
|
||||||
this(saslPropsResolver, trustedChannelResolver, null);
|
this(conf, saslPropsResolver, trustedChannelResolver, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new SaslDataTransferClient.
|
* Creates a new SaslDataTransferClient.
|
||||||
*
|
*
|
||||||
|
* @param conf the configuration
|
||||||
* @param saslPropsResolver for determining properties of SASL negotiation
|
* @param saslPropsResolver for determining properties of SASL negotiation
|
||||||
* @param trustedChannelResolver for identifying trusted connections that do
|
* @param trustedChannelResolver for identifying trusted connections that do
|
||||||
* not require SASL negotiation
|
* not require SASL negotiation
|
||||||
* @param fallbackToSimpleAuth checked on each attempt at general SASL
|
* @param fallbackToSimpleAuth checked on each attempt at general SASL
|
||||||
* handshake, if true forces use of simple auth
|
* handshake, if true forces use of simple auth
|
||||||
*/
|
*/
|
||||||
public SaslDataTransferClient(SaslPropertiesResolver saslPropsResolver,
|
public SaslDataTransferClient(Configuration conf,
|
||||||
|
SaslPropertiesResolver saslPropsResolver,
|
||||||
TrustedChannelResolver trustedChannelResolver,
|
TrustedChannelResolver trustedChannelResolver,
|
||||||
AtomicBoolean fallbackToSimpleAuth) {
|
AtomicBoolean fallbackToSimpleAuth) {
|
||||||
|
this.conf = conf;
|
||||||
this.fallbackToSimpleAuth = fallbackToSimpleAuth;
|
this.fallbackToSimpleAuth = fallbackToSimpleAuth;
|
||||||
this.saslPropsResolver = saslPropsResolver;
|
this.saslPropsResolver = saslPropsResolver;
|
||||||
this.trustedChannelResolver = trustedChannelResolver;
|
this.trustedChannelResolver = trustedChannelResolver;
|
||||||
|
@ -436,17 +446,38 @@ public class SaslDataTransferClient {
|
||||||
sendSaslMessage(out, new byte[0]);
|
sendSaslMessage(out, new byte[0]);
|
||||||
|
|
||||||
// step 1
|
// step 1
|
||||||
performSaslStep1(out, in, sasl);
|
|
||||||
|
|
||||||
// step 2 (client-side only)
|
|
||||||
byte[] remoteResponse = readSaslMessage(in);
|
byte[] remoteResponse = readSaslMessage(in);
|
||||||
byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
|
byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
|
||||||
|
List<CipherOption> cipherOptions = null;
|
||||||
|
if (requestedQopContainsPrivacy(saslProps)) {
|
||||||
|
// Negotiation cipher options
|
||||||
|
CipherOption option = new CipherOption(CipherSuite.AES_CTR_NOPADDING);
|
||||||
|
cipherOptions = Lists.newArrayListWithCapacity(1);
|
||||||
|
cipherOptions.add(option);
|
||||||
|
}
|
||||||
|
sendSaslMessageAndNegotiationCipherOptions(out, localResponse,
|
||||||
|
cipherOptions);
|
||||||
|
|
||||||
|
// step 2 (client-side only)
|
||||||
|
SaslResponseWithNegotiatedCipherOption response =
|
||||||
|
readSaslMessageAndNegotiatedCipherOption(in);
|
||||||
|
localResponse = sasl.evaluateChallengeOrResponse(response.payload);
|
||||||
assert localResponse == null;
|
assert localResponse == null;
|
||||||
|
|
||||||
// SASL handshake is complete
|
// SASL handshake is complete
|
||||||
checkSaslComplete(sasl, saslProps);
|
checkSaslComplete(sasl, saslProps);
|
||||||
|
|
||||||
return sasl.createStreamPair(out, in);
|
CipherOption cipherOption = null;
|
||||||
|
if (sasl.isNegotiatedQopPrivacy()) {
|
||||||
|
// Unwrap the negotiated cipher option
|
||||||
|
cipherOption = unwrap(response.cipherOption, sasl);
|
||||||
|
}
|
||||||
|
|
||||||
|
// If negotiated cipher option is not null, we will use it to create
|
||||||
|
// stream pair.
|
||||||
|
return cipherOption != null ? createStreamPair(
|
||||||
|
conf, cipherOption, underlyingOut, underlyingIn, false) :
|
||||||
|
sasl.createStreamPair(out, in);
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
sendGenericSaslErrorMessage(out, ioe.getMessage());
|
sendGenericSaslErrorMessage(out, ioe.getMessage());
|
||||||
throw ioe;
|
throw ioe;
|
||||||
|
|
|
@ -26,6 +26,7 @@ import java.io.DataOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import javax.security.auth.callback.Callback;
|
import javax.security.auth.callback.Callback;
|
||||||
|
@ -39,6 +40,7 @@ import javax.security.sasl.SaslException;
|
||||||
|
|
||||||
import org.apache.commons.codec.binary.Base64;
|
import org.apache.commons.codec.binary.Base64;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.crypto.CipherOption;
|
||||||
import org.apache.hadoop.hdfs.net.Peer;
|
import org.apache.hadoop.hdfs.net.Peer;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
|
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
|
||||||
|
@ -53,6 +55,7 @@ import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import com.google.common.base.Charsets;
|
import com.google.common.base.Charsets;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Negotiates SASL for DataTransferProtocol on behalf of a server. There are
|
* Negotiates SASL for DataTransferProtocol on behalf of a server. There are
|
||||||
|
@ -351,17 +354,40 @@ public class SaslDataTransferServer {
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
// step 1
|
// step 1
|
||||||
performSaslStep1(out, in, sasl);
|
|
||||||
|
|
||||||
// step 2 (server-side only)
|
|
||||||
byte[] remoteResponse = readSaslMessage(in);
|
byte[] remoteResponse = readSaslMessage(in);
|
||||||
byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
|
byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
|
||||||
sendSaslMessage(out, localResponse);
|
sendSaslMessage(out, localResponse);
|
||||||
|
|
||||||
|
// step 2 (server-side only)
|
||||||
|
List<CipherOption> cipherOptions = Lists.newArrayList();
|
||||||
|
remoteResponse = readSaslMessageAndNegotiationCipherOptions(
|
||||||
|
in, cipherOptions);
|
||||||
|
localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
|
||||||
|
|
||||||
// SASL handshake is complete
|
// SASL handshake is complete
|
||||||
checkSaslComplete(sasl, saslProps);
|
checkSaslComplete(sasl, saslProps);
|
||||||
|
|
||||||
return sasl.createStreamPair(out, in);
|
CipherOption cipherOption = null;
|
||||||
|
if (sasl.isNegotiatedQopPrivacy()) {
|
||||||
|
// Negotiate a cipher option
|
||||||
|
cipherOption = negotiateCipherOption(dnConf.getConf(), cipherOptions);
|
||||||
|
if (cipherOption != null) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Server using cipher suite " +
|
||||||
|
cipherOption.getCipherSuite().getName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If negotiated cipher option is not null, wrap it before sending.
|
||||||
|
sendSaslMessageAndNegotiatedCipherOption(out, localResponse,
|
||||||
|
wrap(cipherOption, sasl));
|
||||||
|
|
||||||
|
// If negotiated cipher option is not null, we will use it to create
|
||||||
|
// stream pair.
|
||||||
|
return cipherOption != null ? createStreamPair(
|
||||||
|
dnConf.getConf(), cipherOption, underlyingOut, underlyingIn, true) :
|
||||||
|
sasl.createStreamPair(out, in);
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
if (ioe instanceof SaslException &&
|
if (ioe instanceof SaslException &&
|
||||||
ioe.getCause() != null &&
|
ioe.getCause() != null &&
|
||||||
|
|
|
@ -130,6 +130,50 @@ class SaslParticipant {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* After successful SASL negotiation, returns whether it's QOP privacy
|
||||||
|
*
|
||||||
|
* @return boolean whether it's QOP privacy
|
||||||
|
*/
|
||||||
|
public boolean isNegotiatedQopPrivacy() {
|
||||||
|
String qop = getNegotiatedQop();
|
||||||
|
return qop != null && "auth-conf".equalsIgnoreCase(qop);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wraps a byte array.
|
||||||
|
*
|
||||||
|
* @param bytes The array containing the bytes to wrap.
|
||||||
|
* @param off The starting position at the array
|
||||||
|
* @param len The number of bytes to wrap
|
||||||
|
* @return byte[] wrapped bytes
|
||||||
|
* @throws SaslException if the bytes cannot be successfully wrapped
|
||||||
|
*/
|
||||||
|
public byte[] wrap(byte[] bytes, int off, int len) throws SaslException {
|
||||||
|
if (saslClient != null) {
|
||||||
|
return saslClient.wrap(bytes, off, len);
|
||||||
|
} else {
|
||||||
|
return saslServer.wrap(bytes, off, len);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unwraps a byte array.
|
||||||
|
*
|
||||||
|
* @param bytes The array containing the bytes to unwrap.
|
||||||
|
* @param off The starting position at the array
|
||||||
|
* @param len The number of bytes to unwrap
|
||||||
|
* @return byte[] unwrapped bytes
|
||||||
|
* @throws SaslException if the bytes cannot be successfully unwrapped
|
||||||
|
*/
|
||||||
|
public byte[] unwrap(byte[] bytes, int off, int len) throws SaslException {
|
||||||
|
if (saslClient != null) {
|
||||||
|
return saslClient.unwrap(bytes, off, len);
|
||||||
|
} else {
|
||||||
|
return saslServer.unwrap(bytes, off, len);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns true if SASL negotiation is complete.
|
* Returns true if SASL negotiation is complete.
|
||||||
*
|
*
|
||||||
|
|
|
@ -0,0 +1,33 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.crypto.CipherOption;
|
||||||
|
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class SaslResponseWithNegotiatedCipherOption {
|
||||||
|
final byte[] payload;
|
||||||
|
final CipherOption cipherOption;
|
||||||
|
|
||||||
|
public SaslResponseWithNegotiatedCipherOption(byte[] payload,
|
||||||
|
CipherOption cipherOption) {
|
||||||
|
this.payload = payload;
|
||||||
|
this.cipherOption = cipherOption;
|
||||||
|
}
|
||||||
|
}
|
|
@ -58,6 +58,7 @@ import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats;
|
||||||
import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
|
import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
|
||||||
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
|
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.CachePoolStats;
|
import org.apache.hadoop.hdfs.protocol.CachePoolStats;
|
||||||
|
import org.apache.hadoop.crypto.CipherOption;
|
||||||
import org.apache.hadoop.crypto.CipherSuite;
|
import org.apache.hadoop.crypto.CipherSuite;
|
||||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||||
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
|
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
|
||||||
|
@ -128,6 +129,7 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockWithLocationsProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlocksWithLocationsProto;
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlocksWithLocationsProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointCommandProto;
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointCommandProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointSignatureProto;
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointSignatureProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CipherOptionProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ContentSummaryProto;
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ContentSummaryProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CorruptFileBlocksProto;
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CorruptFileBlocksProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DataEncryptionKeyProto;
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DataEncryptionKeyProto;
|
||||||
|
@ -2686,6 +2688,83 @@ public class PBHelper {
|
||||||
builder.build()).build();
|
builder.build()).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static CipherOptionProto convert(CipherOption option) {
|
||||||
|
if (option != null) {
|
||||||
|
CipherOptionProto.Builder builder = CipherOptionProto.
|
||||||
|
newBuilder();
|
||||||
|
if (option.getCipherSuite() != null) {
|
||||||
|
builder.setSuite(convert(option.getCipherSuite()));
|
||||||
|
}
|
||||||
|
if (option.getInKey() != null) {
|
||||||
|
builder.setInKey(ByteString.copyFrom(option.getInKey()));
|
||||||
|
}
|
||||||
|
if (option.getInIv() != null) {
|
||||||
|
builder.setInIv(ByteString.copyFrom(option.getInIv()));
|
||||||
|
}
|
||||||
|
if (option.getOutKey() != null) {
|
||||||
|
builder.setOutKey(ByteString.copyFrom(option.getOutKey()));
|
||||||
|
}
|
||||||
|
if (option.getOutIv() != null) {
|
||||||
|
builder.setOutIv(ByteString.copyFrom(option.getOutIv()));
|
||||||
|
}
|
||||||
|
return builder.build();
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static CipherOption convert(CipherOptionProto proto) {
|
||||||
|
if (proto != null) {
|
||||||
|
CipherSuite suite = null;
|
||||||
|
if (proto.getSuite() != null) {
|
||||||
|
suite = convert(proto.getSuite());
|
||||||
|
}
|
||||||
|
byte[] inKey = null;
|
||||||
|
if (proto.getInKey() != null) {
|
||||||
|
inKey = proto.getInKey().toByteArray();
|
||||||
|
}
|
||||||
|
byte[] inIv = null;
|
||||||
|
if (proto.getInIv() != null) {
|
||||||
|
inIv = proto.getInIv().toByteArray();
|
||||||
|
}
|
||||||
|
byte[] outKey = null;
|
||||||
|
if (proto.getOutKey() != null) {
|
||||||
|
outKey = proto.getOutKey().toByteArray();
|
||||||
|
}
|
||||||
|
byte[] outIv = null;
|
||||||
|
if (proto.getOutIv() != null) {
|
||||||
|
outIv = proto.getOutIv().toByteArray();
|
||||||
|
}
|
||||||
|
return new CipherOption(suite, inKey, inIv, outKey, outIv);
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static List<CipherOptionProto> convertCipherOptions(
|
||||||
|
List<CipherOption> options) {
|
||||||
|
if (options != null) {
|
||||||
|
List<CipherOptionProto> protos =
|
||||||
|
Lists.newArrayListWithCapacity(options.size());
|
||||||
|
for (CipherOption option : options) {
|
||||||
|
protos.add(convert(option));
|
||||||
|
}
|
||||||
|
return protos;
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static List<CipherOption> convertCipherOptionProtos(
|
||||||
|
List<CipherOptionProto> protos) {
|
||||||
|
if (protos != null) {
|
||||||
|
List<CipherOption> options =
|
||||||
|
Lists.newArrayListWithCapacity(protos.size());
|
||||||
|
for (CipherOptionProto proto : protos) {
|
||||||
|
options.add(convert(proto));
|
||||||
|
}
|
||||||
|
return options;
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
public static CipherSuiteProto convert(CipherSuite suite) {
|
public static CipherSuiteProto convert(CipherSuite suite) {
|
||||||
switch (suite) {
|
switch (suite) {
|
||||||
case UNKNOWN:
|
case UNKNOWN:
|
||||||
|
|
|
@ -787,7 +787,7 @@ public class Dispatcher {
|
||||||
: Executors.newFixedThreadPool(dispatcherThreads);
|
: Executors.newFixedThreadPool(dispatcherThreads);
|
||||||
this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode;
|
this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode;
|
||||||
|
|
||||||
this.saslClient = new SaslDataTransferClient(
|
this.saslClient = new SaslDataTransferClient(conf,
|
||||||
DataTransferSaslUtil.getSaslPropertiesResolver(conf),
|
DataTransferSaslUtil.getSaslPropertiesResolver(conf),
|
||||||
TrustedChannelResolver.getInstance(conf), nnc.fallbackToSimpleAuth);
|
TrustedChannelResolver.getInstance(conf), nnc.fallbackToSimpleAuth);
|
||||||
}
|
}
|
||||||
|
|
|
@ -64,6 +64,7 @@ import org.apache.hadoop.security.SaslPropertiesResolver;
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class DNConf {
|
public class DNConf {
|
||||||
|
final Configuration conf;
|
||||||
final int socketTimeout;
|
final int socketTimeout;
|
||||||
final int socketWriteTimeout;
|
final int socketWriteTimeout;
|
||||||
final int socketKeepaliveTimeout;
|
final int socketKeepaliveTimeout;
|
||||||
|
@ -100,6 +101,7 @@ public class DNConf {
|
||||||
final long maxLockedMemory;
|
final long maxLockedMemory;
|
||||||
|
|
||||||
public DNConf(Configuration conf) {
|
public DNConf(Configuration conf) {
|
||||||
|
this.conf = conf;
|
||||||
socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
|
socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
|
||||||
HdfsServerConstants.READ_TIMEOUT);
|
HdfsServerConstants.READ_TIMEOUT);
|
||||||
socketWriteTimeout = conf.getInt(DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY,
|
socketWriteTimeout = conf.getInt(DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY,
|
||||||
|
@ -198,6 +200,15 @@ public class DNConf {
|
||||||
return this.minimumNameNodeVersion;
|
return this.minimumNameNodeVersion;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the configuration.
|
||||||
|
*
|
||||||
|
* @return Configuration the configuration
|
||||||
|
*/
|
||||||
|
public Configuration getConf() {
|
||||||
|
return conf;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns true if encryption enabled for DataTransferProtocol.
|
* Returns true if encryption enabled for DataTransferProtocol.
|
||||||
*
|
*
|
||||||
|
|
|
@ -1077,8 +1077,8 @@ public class DataNode extends ReconfigurableBase
|
||||||
// Create the ReadaheadPool from the DataNode context so we can
|
// Create the ReadaheadPool from the DataNode context so we can
|
||||||
// exit without having to explicitly shutdown its thread pool.
|
// exit without having to explicitly shutdown its thread pool.
|
||||||
readaheadPool = ReadaheadPool.getInstance();
|
readaheadPool = ReadaheadPool.getInstance();
|
||||||
saslClient = new SaslDataTransferClient(dnConf.saslPropsResolver,
|
saslClient = new SaslDataTransferClient(dnConf.conf,
|
||||||
dnConf.trustedChannelResolver);
|
dnConf.saslPropsResolver, dnConf.trustedChannelResolver);
|
||||||
saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager);
|
saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -42,6 +42,7 @@ message DataTransferEncryptorMessageProto {
|
||||||
required DataTransferEncryptorStatus status = 1;
|
required DataTransferEncryptorStatus status = 1;
|
||||||
optional bytes payload = 2;
|
optional bytes payload = 2;
|
||||||
optional string message = 3;
|
optional string message = 3;
|
||||||
|
repeated CipherOptionProto cipherOption = 4;
|
||||||
}
|
}
|
||||||
|
|
||||||
message BaseHeaderProto {
|
message BaseHeaderProto {
|
||||||
|
|
|
@ -264,6 +264,17 @@ message ZoneEncryptionInfoProto {
|
||||||
required string keyName = 3;
|
required string keyName = 3;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cipher option
|
||||||
|
*/
|
||||||
|
message CipherOptionProto {
|
||||||
|
required CipherSuiteProto suite = 1;
|
||||||
|
optional bytes inKey = 2;
|
||||||
|
optional bytes inIv = 3;
|
||||||
|
optional bytes outKey = 4;
|
||||||
|
optional bytes outIv = 5;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A set of file blocks and their locations.
|
* A set of file blocks and their locations.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -1514,6 +1514,18 @@
|
||||||
the configured JCE default on the system is used (usually 3DES.) It is
|
the configured JCE default on the system is used (usually 3DES.) It is
|
||||||
widely believed that 3DES is more cryptographically secure, but RC4 is
|
widely believed that 3DES is more cryptographically secure, but RC4 is
|
||||||
substantially faster.
|
substantially faster.
|
||||||
|
|
||||||
|
Note that if AES is supported by both the client and server then this
|
||||||
|
encryption algorithm will only be used to initially transfer keys for AES.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.encrypt.data.transfer.cipher.key.bitlength</name>
|
||||||
|
<value>128</value>
|
||||||
|
<description>
|
||||||
|
The key bitlength negotiated by dfsclient and datanode for encryption.
|
||||||
|
This value may be set to either 128, 192 or 256.
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
|
|
@ -37,11 +37,15 @@ import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
|
import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferServer;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
|
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
|
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
|
||||||
|
import org.apache.log4j.Level;
|
||||||
|
import org.apache.log4j.LogManager;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
import org.junit.runners.Parameterized;
|
import org.junit.runners.Parameterized;
|
||||||
|
@ -50,6 +54,10 @@ import org.mockito.Mockito;
|
||||||
|
|
||||||
@RunWith(Parameterized.class)
|
@RunWith(Parameterized.class)
|
||||||
public class TestEncryptedTransfer {
|
public class TestEncryptedTransfer {
|
||||||
|
{
|
||||||
|
LogManager.getLogger(SaslDataTransferServer.class).setLevel(Level.DEBUG);
|
||||||
|
LogManager.getLogger(DataTransferSaslUtil.class).setLevel(Level.DEBUG);
|
||||||
|
}
|
||||||
|
|
||||||
@Parameters
|
@Parameters
|
||||||
public static Collection<Object[]> data() {
|
public static Collection<Object[]> data() {
|
||||||
|
@ -111,9 +119,28 @@ public class TestEncryptedTransfer {
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
fs = getFileSystem(conf);
|
fs = getFileSystem(conf);
|
||||||
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
|
LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
|
||||||
assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
|
LogFactory.getLog(SaslDataTransferServer.class));
|
||||||
|
LogCapturer logs1 = GenericTestUtils.LogCapturer.captureLogs(
|
||||||
|
LogFactory.getLog(DataTransferSaslUtil.class));
|
||||||
|
try {
|
||||||
|
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
|
||||||
|
assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
|
||||||
|
} finally {
|
||||||
|
logs.stopCapturing();
|
||||||
|
logs1.stopCapturing();
|
||||||
|
}
|
||||||
|
|
||||||
fs.close();
|
fs.close();
|
||||||
|
|
||||||
|
if (resolverClazz != null && !resolverClazz.endsWith("TestTrustedChannelResolver")){
|
||||||
|
// Test client and server negotiate cipher option
|
||||||
|
GenericTestUtils.assertMatches(logs.getOutput(),
|
||||||
|
"Server using cipher suite");
|
||||||
|
// Check the IOStreamPair
|
||||||
|
GenericTestUtils.assertMatches(logs1.getOutput(),
|
||||||
|
"Creating IOStreamPair of CryptoInputStream and CryptoOutputStream.");
|
||||||
|
}
|
||||||
} finally {
|
} finally {
|
||||||
if (cluster != null) {
|
if (cluster != null) {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
|
@ -403,9 +430,28 @@ public class TestEncryptedTransfer {
|
||||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDns).build();
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDns).build();
|
||||||
|
|
||||||
FileSystem fs = getFileSystem(conf);
|
FileSystem fs = getFileSystem(conf);
|
||||||
writeTestDataToFile(fs);
|
|
||||||
|
LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
|
||||||
|
LogFactory.getLog(SaslDataTransferServer.class));
|
||||||
|
LogCapturer logs1 = GenericTestUtils.LogCapturer.captureLogs(
|
||||||
|
LogFactory.getLog(DataTransferSaslUtil.class));
|
||||||
|
try {
|
||||||
|
writeTestDataToFile(fs);
|
||||||
|
} finally {
|
||||||
|
logs.stopCapturing();
|
||||||
|
logs1.stopCapturing();
|
||||||
|
}
|
||||||
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
|
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
|
||||||
fs.close();
|
fs.close();
|
||||||
|
|
||||||
|
if (resolverClazz != null && !resolverClazz.endsWith("TestTrustedChannelResolver")){
|
||||||
|
// Test client and server negotiate cipher option
|
||||||
|
GenericTestUtils.assertMatches(logs.getOutput(),
|
||||||
|
"Server using cipher suite");
|
||||||
|
// Check the IOStreamPair
|
||||||
|
GenericTestUtils.assertMatches(logs1.getOutput(),
|
||||||
|
"Creating IOStreamPair of CryptoInputStream and CryptoOutputStream.");
|
||||||
|
}
|
||||||
} finally {
|
} finally {
|
||||||
if (cluster != null) {
|
if (cluster != null) {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
|
|
Loading…
Reference in New Issue