diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CipherOption.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CipherOption.java new file mode 100644 index 00000000000..6a8d8d0f06c --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CipherOption.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.crypto; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Used between client and server to negotiate the + * cipher suite, key and iv. + */ +@InterfaceAudience.Private +public class CipherOption { + private final CipherSuite suite; + private final byte[] inKey; + private final byte[] inIv; + private final byte[] outKey; + private final byte[] outIv; + + public CipherOption(CipherSuite suite) { + this(suite, null, null, null, null); + } + + public CipherOption(CipherSuite suite, byte[] inKey, byte[] inIv, + byte[] outKey, byte[] outIv) { + this.suite = suite; + this.inKey = inKey; + this.inIv = inIv; + this.outKey = outKey; + this.outIv = outIv; + } + + public CipherSuite getCipherSuite() { + return suite; + } + + public byte[] getInKey() { + return inKey; + } + + public byte[] getInIv() { + return inIv; + } + + public byte[] getOutKey() { + return outKey; + } + + public byte[] getOutIv() { + return outIv; + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java index 68e969737c5..4b5356384d5 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java @@ -23,6 +23,7 @@ import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; import java.security.GeneralSecurityException; import java.util.EnumSet; import java.util.Queue; @@ -57,7 +58,8 @@ import com.google.common.base.Preconditions; @InterfaceStability.Evolving public class CryptoInputStream extends FilterInputStream implements Seekable, PositionedReadable, ByteBufferReadable, HasFileDescriptor, - CanSetDropBehind, CanSetReadahead, HasEnhancedByteBufferAccess { + CanSetDropBehind, CanSetReadahead, HasEnhancedByteBufferAccess, + ReadableByteChannel { private static final byte[] oneByteBuf = new byte[1]; private final CryptoCodec codec; private final Decryptor decryptor; @@ -92,6 +94,8 @@ public class CryptoInputStream extends FilterInputStream implements private final byte[] key; private final byte[] initIV; private byte[] iv; + private final boolean isByteBufferReadable; + private final boolean isReadableByteChannel; /** DirectBuffer pool */ private final Queue bufferPool = @@ -115,6 +119,8 @@ public class CryptoInputStream extends FilterInputStream implements this.initIV = iv.clone(); this.iv = iv.clone(); this.streamOffset = streamOffset; + isByteBufferReadable = in instanceof ByteBufferReadable; + isReadableByteChannel = in instanceof ReadableByteChannel; inBuffer = ByteBuffer.allocateDirect(this.bufferSize); outBuffer = ByteBuffer.allocateDirect(this.bufferSize); decryptor = getDecryptor(); @@ -165,9 +171,11 @@ public class CryptoInputStream extends FilterInputStream implements * it can avoid bytes copy. */ if (usingByteBufferRead == null) { - if (in instanceof ByteBufferReadable) { + if (isByteBufferReadable || isReadableByteChannel) { try { - n = ((ByteBufferReadable) in).read(inBuffer); + n = isByteBufferReadable ? + ((ByteBufferReadable) in).read(inBuffer) : + ((ReadableByteChannel) in).read(inBuffer); usingByteBufferRead = Boolean.TRUE; } catch (UnsupportedOperationException e) { usingByteBufferRead = Boolean.FALSE; @@ -180,7 +188,8 @@ public class CryptoInputStream extends FilterInputStream implements } } else { if (usingByteBufferRead) { - n = ((ByteBufferReadable) in).read(inBuffer); + n = isByteBufferReadable ? ((ByteBufferReadable) in).read(inBuffer) : + ((ReadableByteChannel) in).read(inBuffer); } else { n = readFromUnderlyingStream(inBuffer); } @@ -450,7 +459,7 @@ public class CryptoInputStream extends FilterInputStream implements @Override public int read(ByteBuffer buf) throws IOException { checkStream(); - if (in instanceof ByteBufferReadable) { + if (isByteBufferReadable || isReadableByteChannel) { final int unread = outBuffer.remaining(); if (unread > 0) { // Have unread decrypted data in buffer. int toRead = buf.remaining(); @@ -466,7 +475,8 @@ public class CryptoInputStream extends FilterInputStream implements } final int pos = buf.position(); - final int n = ((ByteBufferReadable) in).read(buf); + final int n = isByteBufferReadable ? ((ByteBufferReadable) in).read(buf) : + ((ReadableByteChannel) in).read(buf); if (n > 0) { streamOffset += n; // Read n bytes decrypt(buf, n, pos); @@ -481,10 +491,22 @@ public class CryptoInputStream extends FilterInputStream implements return unread; } } + } else { + int n = 0; + if (buf.hasArray()) { + n = read(buf.array(), buf.position(), buf.remaining()); + if (n > 0) { + buf.position(buf.position() + n); + } + } else { + byte[] tmp = new byte[buf.remaining()]; + n = read(tmp); + if (n > 0) { + buf.put(tmp, 0, n); + } + } + return n; } - - throw new UnsupportedOperationException("ByteBuffer read unsupported " + - "by input stream."); } /** @@ -686,4 +708,9 @@ public class CryptoInputStream extends FilterInputStream implements decryptorPool.add(decryptor); } } + + @Override + public boolean isOpen() { + return !closed; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 3831fe47036..20590ce73db 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -407,6 +407,8 @@ Release 2.6.0 - UNRELEASED HDFS-7122. Use of ThreadLocal results in poor block placement. (wang) + HDFS-6606. Optimize HDFS Encrypted Transport performance. (yliu) + BUG FIXES HDFS-6823. dfs.web.authentication.kerberos.principal shows up in logs for diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index d0583f9dbe7..1f66b206792 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -693,7 +693,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, this.initThreadsNumForHedgedReads(numThreads); } this.saslClient = new SaslDataTransferClient( - DataTransferSaslUtil.getSaslPropertiesResolver(conf), + conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf), TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 50ea800ccd1..051574a7e27 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -606,6 +606,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { // Security-related configs public static final String DFS_ENCRYPT_DATA_TRANSFER_KEY = "dfs.encrypt.data.transfer"; public static final boolean DFS_ENCRYPT_DATA_TRANSFER_DEFAULT = false; + public static final String DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_KEY = "dfs.encrypt.data.transfer.cipher.key.bitlength"; + public static final int DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_DEFAULT = 128; public static final String DFS_DATA_ENCRYPTION_ALGORITHM_KEY = "dfs.encrypt.data.transfer.algorithm"; public static final String DFS_TRUSTEDCHANNEL_RESOLVER_CLASS = "dfs.trustedchannel.resolver.class"; public static final String DFS_DATA_TRANSFER_PROTECTION_KEY = "dfs.data.transfer.protection"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java index 81d740f2353..2d5e13c4a12 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java @@ -21,6 +21,8 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_RPC_PROT import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_DEFAULT; import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed; import java.io.IOException; @@ -28,6 +30,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.net.InetAddress; import java.util.Arrays; +import java.util.List; import java.util.Map; import java.util.Set; import javax.security.sasl.Sasl; @@ -35,10 +38,18 @@ import javax.security.sasl.Sasl; import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.crypto.CipherOption; +import org.apache.hadoop.crypto.CipherSuite; +import org.apache.hadoop.crypto.CryptoCodec; +import org.apache.hadoop.crypto.CryptoInputStream; +import org.apache.hadoop.crypto.CryptoOutputStream; import org.apache.hadoop.hdfs.net.Peer; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CipherOptionProto; +import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.security.SaslPropertiesResolver; import org.apache.hadoop.security.SaslRpcServer.QualityOfProtection; import org.slf4j.Logger; @@ -95,6 +106,19 @@ public final class DataTransferSaslUtil { "requested = %s, negotiated = %s", requestedQop, negotiatedQop)); } } + + /** + * Check whether requested SASL Qop contains privacy. + * + * @param saslProps properties of SASL negotiation + * @return boolean true if privacy exists + */ + public static boolean requestedQopContainsPrivacy( + Map saslProps) { + Set requestedQop = ImmutableSet.copyOf(Arrays.asList( + saslProps.get(Sasl.QOP).split(","))); + return requestedQop.contains("auth-conf"); + } /** * Creates SASL properties required for an encrypted SASL negotiation. @@ -176,20 +200,6 @@ public final class DataTransferSaslUtil { return resolver; } - /** - * Performs the first step of SASL negotiation. - * - * @param out connection output stream - * @param in connection input stream - * @param sasl participant - */ - public static void performSaslStep1(OutputStream out, InputStream in, - SaslParticipant sasl) throws IOException { - byte[] remoteResponse = readSaslMessage(in); - byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse); - sendSaslMessage(out, localResponse); - } - /** * Reads a SASL negotiation message. * @@ -208,6 +218,124 @@ public final class DataTransferSaslUtil { return proto.getPayload().toByteArray(); } } + + /** + * Reads a SASL negotiation message and negotiation cipher options. + * + * @param in stream to read + * @param cipherOptions list to store negotiation cipher options + * @return byte[] SASL negotiation message + * @throws IOException for any error + */ + public static byte[] readSaslMessageAndNegotiationCipherOptions( + InputStream in, List cipherOptions) throws IOException { + DataTransferEncryptorMessageProto proto = + DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in)); + if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) { + throw new InvalidEncryptionKeyException(proto.getMessage()); + } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) { + throw new IOException(proto.getMessage()); + } else { + List optionProtos = proto.getCipherOptionList(); + if (optionProtos != null) { + for (CipherOptionProto optionProto : optionProtos) { + cipherOptions.add(PBHelper.convert(optionProto)); + } + } + return proto.getPayload().toByteArray(); + } + } + + /** + * Negotiate a cipher option which server supports. + * + * @param options the cipher options which client supports + * @return CipherOption negotiated cipher option + */ + public static CipherOption negotiateCipherOption(Configuration conf, + List options) { + if (options != null) { + for (CipherOption option : options) { + // Currently we support AES/CTR/NoPadding + CipherSuite suite = option.getCipherSuite(); + if (suite == CipherSuite.AES_CTR_NOPADDING) { + int keyLen = conf.getInt( + DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_KEY, + DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_DEFAULT) / 8; + CryptoCodec codec = CryptoCodec.getInstance(conf, suite); + byte[] inKey = new byte[keyLen]; + byte[] inIv = new byte[suite.getAlgorithmBlockSize()]; + byte[] outKey = new byte[keyLen]; + byte[] outIv = new byte[suite.getAlgorithmBlockSize()]; + codec.generateSecureRandom(inKey); + codec.generateSecureRandom(inIv); + codec.generateSecureRandom(outKey); + codec.generateSecureRandom(outIv); + return new CipherOption(suite, inKey, inIv, outKey, outIv); + } + } + } + return null; + } + + /** + * Send SASL message and negotiated cipher option to client. + * + * @param out stream to receive message + * @param payload to send + * @param option negotiated cipher option + * @throws IOException for any error + */ + public static void sendSaslMessageAndNegotiatedCipherOption( + OutputStream out, byte[] payload, CipherOption option) + throws IOException { + DataTransferEncryptorMessageProto.Builder builder = + DataTransferEncryptorMessageProto.newBuilder(); + + builder.setStatus(DataTransferEncryptorStatus.SUCCESS); + if (payload != null) { + builder.setPayload(ByteString.copyFrom(payload)); + } + if (option != null) { + builder.addCipherOption(PBHelper.convert(option)); + } + + DataTransferEncryptorMessageProto proto = builder.build(); + proto.writeDelimitedTo(out); + out.flush(); + } + + /** + * Create IOStreamPair of {@link org.apache.hadoop.crypto.CryptoInputStream} + * and {@link org.apache.hadoop.crypto.CryptoOutputStream} + * + * @param conf the configuration + * @param cipherOption negotiated cipher option + * @param out underlying output stream + * @param in underlying input stream + * @param isServer is server side + * @return IOStreamPair the stream pair + * @throws IOException for any error + */ + public static IOStreamPair createStreamPair(Configuration conf, + CipherOption cipherOption, OutputStream out, InputStream in, + boolean isServer) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Creating IOStreamPair of CryptoInputStream and " + + "CryptoOutputStream."); + } + CryptoCodec codec = CryptoCodec.getInstance(conf, + cipherOption.getCipherSuite()); + byte[] inKey = cipherOption.getInKey(); + byte[] inIv = cipherOption.getInIv(); + byte[] outKey = cipherOption.getOutKey(); + byte[] outIv = cipherOption.getOutIv(); + InputStream cIn = new CryptoInputStream(in, codec, + isServer ? inKey : outKey, isServer ? inIv : outIv); + OutputStream cOut = new CryptoOutputStream(out, codec, + isServer ? outKey : inKey, isServer ? outIv : inIv); + return new IOStreamPair(cIn, cOut); + } /** * Sends a SASL negotiation message indicating an error. @@ -232,6 +360,116 @@ public final class DataTransferSaslUtil { throws IOException { sendSaslMessage(out, DataTransferEncryptorStatus.SUCCESS, payload, null); } + + /** + * Send a SASL negotiation message and negotiation cipher options to server. + * + * @param out stream to receive message + * @param payload to send + * @param options cipher options to negotiate + * @throws IOException for any error + */ + public static void sendSaslMessageAndNegotiationCipherOptions( + OutputStream out, byte[] payload, List options) + throws IOException { + DataTransferEncryptorMessageProto.Builder builder = + DataTransferEncryptorMessageProto.newBuilder(); + + builder.setStatus(DataTransferEncryptorStatus.SUCCESS); + if (payload != null) { + builder.setPayload(ByteString.copyFrom(payload)); + } + if (options != null) { + builder.addAllCipherOption(PBHelper.convertCipherOptions(options)); + } + + DataTransferEncryptorMessageProto proto = builder.build(); + proto.writeDelimitedTo(out); + out.flush(); + } + + /** + * Read SASL message and negotiated cipher option from server. + * + * @param in stream to read + * @return SaslResponseWithNegotiatedCipherOption SASL message and + * negotiated cipher option + * @throws IOException for any error + */ + public static SaslResponseWithNegotiatedCipherOption + readSaslMessageAndNegotiatedCipherOption(InputStream in) + throws IOException { + DataTransferEncryptorMessageProto proto = + DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in)); + if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) { + throw new InvalidEncryptionKeyException(proto.getMessage()); + } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) { + throw new IOException(proto.getMessage()); + } else { + byte[] response = proto.getPayload().toByteArray(); + List options = PBHelper.convertCipherOptionProtos( + proto.getCipherOptionList()); + CipherOption option = null; + if (options != null && !options.isEmpty()) { + option = options.get(0); + } + return new SaslResponseWithNegotiatedCipherOption(response, option); + } + } + + /** + * Encrypt the key and iv of the negotiated cipher option. + * + * @param option negotiated cipher option + * @param sasl SASL participant representing server + * @return CipherOption negotiated cipher option which contains the + * encrypted key and iv + * @throws IOException for any error + */ + public static CipherOption wrap(CipherOption option, SaslParticipant sasl) + throws IOException { + if (option != null) { + byte[] inKey = option.getInKey(); + if (inKey != null) { + inKey = sasl.wrap(inKey, 0, inKey.length); + } + byte[] outKey = option.getOutKey(); + if (outKey != null) { + outKey = sasl.wrap(outKey, 0, outKey.length); + } + return new CipherOption(option.getCipherSuite(), inKey, option.getInIv(), + outKey, option.getOutIv()); + } + + return null; + } + + /** + * Decrypt the key and iv of the negotiated cipher option. + * + * @param option negotiated cipher option + * @param sasl SASL participant representing client + * @return CipherOption negotiated cipher option which contains the + * decrypted key and iv + * @throws IOException for any error + */ + public static CipherOption unwrap(CipherOption option, SaslParticipant sasl) + throws IOException { + if (option != null) { + byte[] inKey = option.getInKey(); + if (inKey != null) { + inKey = sasl.unwrap(inKey, 0, inKey.length); + } + byte[] outKey = option.getOutKey(); + if (outKey != null) { + outKey = sasl.unwrap(outKey, 0, outKey.length); + } + return new CipherOption(option.getCipherSuite(), inKey, option.getInIv(), + outKey, option.getOutIv()); + } + + return null; + } /** * Sends a SASL negotiation message. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java index 9df9929dfd4..cfcc91fa2be 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hdfs.protocol.datatransfer.sasl; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY; import static org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil.*; import java.io.DataInputStream; @@ -27,6 +26,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.net.InetAddress; import java.net.Socket; +import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; @@ -40,6 +40,9 @@ import javax.security.sasl.RealmChoiceCallback; import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.crypto.CipherOption; +import org.apache.hadoop.crypto.CipherSuite; import org.apache.hadoop.hdfs.net.EncryptedPeer; import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.protocol.DatanodeID; @@ -54,6 +57,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Charsets; +import com.google.common.collect.Lists; /** * Negotiates SASL for DataTransferProtocol on behalf of a client. There are @@ -72,6 +76,7 @@ public class SaslDataTransferClient { private static final Logger LOG = LoggerFactory.getLogger( SaslDataTransferClient.class); + private final Configuration conf; private final AtomicBoolean fallbackToSimpleAuth; private final SaslPropertiesResolver saslPropsResolver; private final TrustedChannelResolver trustedChannelResolver; @@ -82,27 +87,32 @@ public class SaslDataTransferClient { * simple auth. For intra-cluster connections between data nodes in the same * cluster, we can assume that all run under the same security configuration. * + * @param conf the configuration * @param saslPropsResolver for determining properties of SASL negotiation * @param trustedChannelResolver for identifying trusted connections that do * not require SASL negotiation */ - public SaslDataTransferClient(SaslPropertiesResolver saslPropsResolver, + public SaslDataTransferClient(Configuration conf, + SaslPropertiesResolver saslPropsResolver, TrustedChannelResolver trustedChannelResolver) { - this(saslPropsResolver, trustedChannelResolver, null); + this(conf, saslPropsResolver, trustedChannelResolver, null); } /** * Creates a new SaslDataTransferClient. * + * @param conf the configuration * @param saslPropsResolver for determining properties of SASL negotiation * @param trustedChannelResolver for identifying trusted connections that do * not require SASL negotiation * @param fallbackToSimpleAuth checked on each attempt at general SASL * handshake, if true forces use of simple auth */ - public SaslDataTransferClient(SaslPropertiesResolver saslPropsResolver, + public SaslDataTransferClient(Configuration conf, + SaslPropertiesResolver saslPropsResolver, TrustedChannelResolver trustedChannelResolver, AtomicBoolean fallbackToSimpleAuth) { + this.conf = conf; this.fallbackToSimpleAuth = fallbackToSimpleAuth; this.saslPropsResolver = saslPropsResolver; this.trustedChannelResolver = trustedChannelResolver; @@ -436,17 +446,38 @@ public class SaslDataTransferClient { sendSaslMessage(out, new byte[0]); // step 1 - performSaslStep1(out, in, sasl); - - // step 2 (client-side only) byte[] remoteResponse = readSaslMessage(in); byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse); + List cipherOptions = null; + if (requestedQopContainsPrivacy(saslProps)) { + // Negotiation cipher options + CipherOption option = new CipherOption(CipherSuite.AES_CTR_NOPADDING); + cipherOptions = Lists.newArrayListWithCapacity(1); + cipherOptions.add(option); + } + sendSaslMessageAndNegotiationCipherOptions(out, localResponse, + cipherOptions); + + // step 2 (client-side only) + SaslResponseWithNegotiatedCipherOption response = + readSaslMessageAndNegotiatedCipherOption(in); + localResponse = sasl.evaluateChallengeOrResponse(response.payload); assert localResponse == null; // SASL handshake is complete checkSaslComplete(sasl, saslProps); - return sasl.createStreamPair(out, in); + CipherOption cipherOption = null; + if (sasl.isNegotiatedQopPrivacy()) { + // Unwrap the negotiated cipher option + cipherOption = unwrap(response.cipherOption, sasl); + } + + // If negotiated cipher option is not null, we will use it to create + // stream pair. + return cipherOption != null ? createStreamPair( + conf, cipherOption, underlyingOut, underlyingIn, false) : + sasl.createStreamPair(out, in); } catch (IOException ioe) { sendGenericSaslErrorMessage(out, ioe.getMessage()); throw ioe; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java index 2b82c82f26a..005856d8ce8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java @@ -26,6 +26,7 @@ import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.List; import java.util.Map; import javax.security.auth.callback.Callback; @@ -39,6 +40,7 @@ import javax.security.sasl.SaslException; import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.crypto.CipherOption; import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; @@ -53,6 +55,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Charsets; +import com.google.common.collect.Lists; /** * Negotiates SASL for DataTransferProtocol on behalf of a server. There are @@ -351,17 +354,40 @@ public class SaslDataTransferServer { } try { // step 1 - performSaslStep1(out, in, sasl); - - // step 2 (server-side only) byte[] remoteResponse = readSaslMessage(in); byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse); sendSaslMessage(out, localResponse); + // step 2 (server-side only) + List cipherOptions = Lists.newArrayList(); + remoteResponse = readSaslMessageAndNegotiationCipherOptions( + in, cipherOptions); + localResponse = sasl.evaluateChallengeOrResponse(remoteResponse); + // SASL handshake is complete checkSaslComplete(sasl, saslProps); - return sasl.createStreamPair(out, in); + CipherOption cipherOption = null; + if (sasl.isNegotiatedQopPrivacy()) { + // Negotiate a cipher option + cipherOption = negotiateCipherOption(dnConf.getConf(), cipherOptions); + if (cipherOption != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Server using cipher suite " + + cipherOption.getCipherSuite().getName()); + } + } + } + + // If negotiated cipher option is not null, wrap it before sending. + sendSaslMessageAndNegotiatedCipherOption(out, localResponse, + wrap(cipherOption, sasl)); + + // If negotiated cipher option is not null, we will use it to create + // stream pair. + return cipherOption != null ? createStreamPair( + dnConf.getConf(), cipherOption, underlyingOut, underlyingIn, true) : + sasl.createStreamPair(out, in); } catch (IOException ioe) { if (ioe instanceof SaslException && ioe.getCause() != null && diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslParticipant.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslParticipant.java index 106e297d5c8..f14a075192d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslParticipant.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslParticipant.java @@ -129,6 +129,50 @@ class SaslParticipant { return (String) saslServer.getNegotiatedProperty(Sasl.QOP); } } + + /** + * After successful SASL negotiation, returns whether it's QOP privacy + * + * @return boolean whether it's QOP privacy + */ + public boolean isNegotiatedQopPrivacy() { + String qop = getNegotiatedQop(); + return qop != null && "auth-conf".equalsIgnoreCase(qop); + } + + /** + * Wraps a byte array. + * + * @param bytes The array containing the bytes to wrap. + * @param off The starting position at the array + * @param len The number of bytes to wrap + * @return byte[] wrapped bytes + * @throws SaslException if the bytes cannot be successfully wrapped + */ + public byte[] wrap(byte[] bytes, int off, int len) throws SaslException { + if (saslClient != null) { + return saslClient.wrap(bytes, off, len); + } else { + return saslServer.wrap(bytes, off, len); + } + } + + /** + * Unwraps a byte array. + * + * @param bytes The array containing the bytes to unwrap. + * @param off The starting position at the array + * @param len The number of bytes to unwrap + * @return byte[] unwrapped bytes + * @throws SaslException if the bytes cannot be successfully unwrapped + */ + public byte[] unwrap(byte[] bytes, int off, int len) throws SaslException { + if (saslClient != null) { + return saslClient.unwrap(bytes, off, len); + } else { + return saslServer.unwrap(bytes, off, len); + } + } /** * Returns true if SASL negotiation is complete. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslResponseWithNegotiatedCipherOption.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslResponseWithNegotiatedCipherOption.java new file mode 100644 index 00000000000..f69441b95f9 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslResponseWithNegotiatedCipherOption.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol.datatransfer.sasl; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.crypto.CipherOption; + +@InterfaceAudience.Private +public class SaslResponseWithNegotiatedCipherOption { + final byte[] payload; + final CipherOption cipherOption; + + public SaslResponseWithNegotiatedCipherOption(byte[] payload, + CipherOption cipherOption) { + this.payload = payload; + this.cipherOption = cipherOption; + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index 83c2a62d493..64355a92d97 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -58,6 +58,7 @@ import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats; import org.apache.hadoop.hdfs.protocol.CachePoolEntry; import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.CachePoolStats; +import org.apache.hadoop.crypto.CipherOption; import org.apache.hadoop.crypto.CipherSuite; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; @@ -128,6 +129,7 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockWithLocationsProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlocksWithLocationsProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointCommandProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointSignatureProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CipherOptionProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ContentSummaryProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CorruptFileBlocksProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DataEncryptionKeyProto; @@ -2685,6 +2687,83 @@ public class PBHelper { return GetEditsFromTxidResponseProto.newBuilder().setEventsList( builder.build()).build(); } + + public static CipherOptionProto convert(CipherOption option) { + if (option != null) { + CipherOptionProto.Builder builder = CipherOptionProto. + newBuilder(); + if (option.getCipherSuite() != null) { + builder.setSuite(convert(option.getCipherSuite())); + } + if (option.getInKey() != null) { + builder.setInKey(ByteString.copyFrom(option.getInKey())); + } + if (option.getInIv() != null) { + builder.setInIv(ByteString.copyFrom(option.getInIv())); + } + if (option.getOutKey() != null) { + builder.setOutKey(ByteString.copyFrom(option.getOutKey())); + } + if (option.getOutIv() != null) { + builder.setOutIv(ByteString.copyFrom(option.getOutIv())); + } + return builder.build(); + } + return null; + } + + public static CipherOption convert(CipherOptionProto proto) { + if (proto != null) { + CipherSuite suite = null; + if (proto.getSuite() != null) { + suite = convert(proto.getSuite()); + } + byte[] inKey = null; + if (proto.getInKey() != null) { + inKey = proto.getInKey().toByteArray(); + } + byte[] inIv = null; + if (proto.getInIv() != null) { + inIv = proto.getInIv().toByteArray(); + } + byte[] outKey = null; + if (proto.getOutKey() != null) { + outKey = proto.getOutKey().toByteArray(); + } + byte[] outIv = null; + if (proto.getOutIv() != null) { + outIv = proto.getOutIv().toByteArray(); + } + return new CipherOption(suite, inKey, inIv, outKey, outIv); + } + return null; + } + + public static List convertCipherOptions( + List options) { + if (options != null) { + List protos = + Lists.newArrayListWithCapacity(options.size()); + for (CipherOption option : options) { + protos.add(convert(option)); + } + return protos; + } + return null; + } + + public static List convertCipherOptionProtos( + List protos) { + if (protos != null) { + List options = + Lists.newArrayListWithCapacity(protos.size()); + for (CipherOptionProto proto : protos) { + options.add(convert(proto)); + } + return options; + } + return null; + } public static CipherSuiteProto convert(CipherSuite suite) { switch (suite) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java index 5c0ea512ccd..6682ba38104 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java @@ -787,7 +787,7 @@ public class Dispatcher { : Executors.newFixedThreadPool(dispatcherThreads); this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode; - this.saslClient = new SaslDataTransferClient( + this.saslClient = new SaslDataTransferClient(conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf), TrustedChannelResolver.getInstance(conf), nnc.fallbackToSimpleAuth); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java index 31276825603..67cd1ce1aae 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java @@ -64,6 +64,7 @@ import org.apache.hadoop.security.SaslPropertiesResolver; */ @InterfaceAudience.Private public class DNConf { + final Configuration conf; final int socketTimeout; final int socketWriteTimeout; final int socketKeepaliveTimeout; @@ -100,6 +101,7 @@ public class DNConf { final long maxLockedMemory; public DNConf(Configuration conf) { + this.conf = conf; socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, HdfsServerConstants.READ_TIMEOUT); socketWriteTimeout = conf.getInt(DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY, @@ -197,6 +199,15 @@ public class DNConf { String getMinimumNameNodeVersion() { return this.minimumNameNodeVersion; } + + /** + * Returns the configuration. + * + * @return Configuration the configuration + */ + public Configuration getConf() { + return conf; + } /** * Returns true if encryption enabled for DataTransferProtocol. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 7198c87398d..7cade186c01 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -1077,8 +1077,8 @@ public class DataNode extends ReconfigurableBase // Create the ReadaheadPool from the DataNode context so we can // exit without having to explicitly shutdown its thread pool. readaheadPool = ReadaheadPool.getInstance(); - saslClient = new SaslDataTransferClient(dnConf.saslPropsResolver, - dnConf.trustedChannelResolver); + saslClient = new SaslDataTransferClient(dnConf.conf, + dnConf.saslPropsResolver, dnConf.trustedChannelResolver); saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto index 50cc00d91ac..fd1ba8a0980 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto @@ -42,6 +42,7 @@ message DataTransferEncryptorMessageProto { required DataTransferEncryptorStatus status = 1; optional bytes payload = 2; optional string message = 3; + repeated CipherOptionProto cipherOption = 4; } message BaseHeaderProto { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto index 10af3b8b61b..04a8f3f0fb0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto @@ -264,6 +264,17 @@ message ZoneEncryptionInfoProto { required string keyName = 3; } +/** + * Cipher option + */ +message CipherOptionProto { + required CipherSuiteProto suite = 1; + optional bytes inKey = 2; + optional bytes inIv = 3; + optional bytes outKey = 4; + optional bytes outIv = 5; +} + /** * A set of file blocks and their locations. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 5ba1eec5117..2bf42538c36 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -1514,6 +1514,18 @@ the configured JCE default on the system is used (usually 3DES.) It is widely believed that 3DES is more cryptographically secure, but RC4 is substantially faster. + + Note that if AES is supported by both the client and server then this + encryption algorithm will only be used to initially transfer keys for AES. + + + + + dfs.encrypt.data.transfer.cipher.key.bitlength + 128 + + The key bitlength negotiated by dfsclient and datanode for encryption. + This value may be set to either 128, 192 or 256. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptedTransfer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptedTransfer.java index 131912d3c04..7f6ad1a38ca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptedTransfer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptedTransfer.java @@ -37,11 +37,15 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferServer; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils.LogCapturer; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -50,6 +54,10 @@ import org.mockito.Mockito; @RunWith(Parameterized.class) public class TestEncryptedTransfer { + { + LogManager.getLogger(SaslDataTransferServer.class).setLevel(Level.DEBUG); + LogManager.getLogger(DataTransferSaslUtil.class).setLevel(Level.DEBUG); + } @Parameters public static Collection data() { @@ -111,9 +119,28 @@ public class TestEncryptedTransfer { .build(); fs = getFileSystem(conf); - assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); - assertEquals(checksum, fs.getFileChecksum(TEST_PATH)); + LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs( + LogFactory.getLog(SaslDataTransferServer.class)); + LogCapturer logs1 = GenericTestUtils.LogCapturer.captureLogs( + LogFactory.getLog(DataTransferSaslUtil.class)); + try { + assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); + assertEquals(checksum, fs.getFileChecksum(TEST_PATH)); + } finally { + logs.stopCapturing(); + logs1.stopCapturing(); + } + fs.close(); + + if (resolverClazz != null && !resolverClazz.endsWith("TestTrustedChannelResolver")){ + // Test client and server negotiate cipher option + GenericTestUtils.assertMatches(logs.getOutput(), + "Server using cipher suite"); + // Check the IOStreamPair + GenericTestUtils.assertMatches(logs1.getOutput(), + "Creating IOStreamPair of CryptoInputStream and CryptoOutputStream."); + } } finally { if (cluster != null) { cluster.shutdown(); @@ -403,9 +430,28 @@ public class TestEncryptedTransfer { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDns).build(); FileSystem fs = getFileSystem(conf); - writeTestDataToFile(fs); + + LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs( + LogFactory.getLog(SaslDataTransferServer.class)); + LogCapturer logs1 = GenericTestUtils.LogCapturer.captureLogs( + LogFactory.getLog(DataTransferSaslUtil.class)); + try { + writeTestDataToFile(fs); + } finally { + logs.stopCapturing(); + logs1.stopCapturing(); + } assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); fs.close(); + + if (resolverClazz != null && !resolverClazz.endsWith("TestTrustedChannelResolver")){ + // Test client and server negotiate cipher option + GenericTestUtils.assertMatches(logs.getOutput(), + "Server using cipher suite"); + // Check the IOStreamPair + GenericTestUtils.assertMatches(logs1.getOutput(), + "Creating IOStreamPair of CryptoInputStream and CryptoOutputStream."); + } } finally { if (cluster != null) { cluster.shutdown();