From 0d252918f65aca8dba070f2da1d32de8b2fd1c1c Mon Sep 17 00:00:00 2001 From: zhangduo Date: Thu, 5 May 2016 10:06:55 +0800 Subject: [PATCH] HBASE-15743 Add Transparent Data Encryption support for FanOutOneBlockAsyncDFSOutput --- .../asyncfs/FanOutOneBlockAsyncDFSOutput.java | 24 +- .../FanOutOneBlockAsyncDFSOutputHelper.java | 9 +- ...anOutOneBlockAsyncDFSOutputSaslHelper.java | 239 +++++++++++++----- .../TestSaslFanOutOneBlockAsyncDFSOutput.java | 75 +++++- 4 files changed, 264 insertions(+), 83 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java index 7d6a676d183..8dd7f5e1bcb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java @@ -43,6 +43,7 @@ import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.Promise; import java.io.IOException; +import java.nio.ByteBuffer; import java.nio.channels.CompletionHandler; import java.util.ArrayDeque; import java.util.Collection; @@ -58,6 +59,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose; +import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.CryptoCodec; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hdfs.DFSClient; @@ -119,6 +121,8 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { private final LocatedBlock locatedBlock; + private final CryptoCodec cryptoCodec; + private final EventLoop eventLoop; private final List datanodeList; @@ -317,8 +321,8 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { FanOutOneBlockAsyncDFSOutput(Configuration conf, FSUtils fsUtils, DistributedFileSystem dfs, DFSClient client, ClientProtocol namenode, String clientName, String src, long fileId, - LocatedBlock locatedBlock, EventLoop eventLoop, List datanodeList, - DataChecksum summer, ByteBufAllocator alloc) { + LocatedBlock locatedBlock, CryptoCodec cryptoCodec, EventLoop eventLoop, + List datanodeList, DataChecksum summer, ByteBufAllocator alloc) { this.conf = conf; this.fsUtils = fsUtils; this.dfs = dfs; @@ -328,6 +332,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { this.clientName = clientName; this.src = src; this.locatedBlock = locatedBlock; + this.cryptoCodec = cryptoCodec; this.eventLoop = eventLoop; this.datanodeList = datanodeList; this.summer = summer; @@ -342,16 +347,27 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { write(b, 0, b.length); } + private void write0(byte[] b, final int off, final int len) { + buf.ensureWritable(len); + if (cryptoCodec == null) { + buf.writeBytes(b, off, len); + } else { + ByteBuffer inBuffer = ByteBuffer.wrap(b, off, len); + cryptoCodec.encrypt(inBuffer, buf.nioBuffer(buf.writerIndex(), len)); + buf.writerIndex(buf.writerIndex() + len); + } + } + @Override public void write(final byte[] b, final int off, final int len) { if (eventLoop.inEventLoop()) { - buf.ensureWritable(len).writeBytes(b, off, len); + write0(b, off, len); } else { eventLoop.submit(new Runnable() { @Override public void run() { - buf.ensureWritable(len).writeBytes(b, off, len); + write0(b, off, len); } }).syncUninterruptibly(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java index 4f9058cc559..7b680e15b26 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java @@ -21,6 +21,7 @@ import static io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS; import static io.netty.handler.timeout.IdleState.READER_IDLE; import static org.apache.hadoop.fs.CreateFlag.CREATE; import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; +import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createCryptoCodec; import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME; @@ -73,6 +74,7 @@ import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.ConnectionUtils; +import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.CryptoCodec; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hdfs.DFSClient; @@ -672,9 +674,12 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { // layer should retry itself if needed. datanodeList.add(future.syncUninterruptibly().getNow()); } + CryptoCodec cryptocodec = createCryptoCodec(conf, stat, client); + FanOutOneBlockAsyncDFSOutput output = new FanOutOneBlockAsyncDFSOutput(conf, fsUtils, dfs, + client, namenode, clientName, src, stat.getFileId(), locatedBlock, cryptocodec, eventLoop, + datanodeList, summer, ALLOC); succ = true; - return new FanOutOneBlockAsyncDFSOutput(conf, fsUtils, dfs, client, namenode, clientName, - src, stat.getFileId(), locatedBlock, eventLoop, datanodeList, summer, ALLOC); + return output; } finally { if (!succ) { if (futureList != null) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java index 22c4e044a29..33e88416db4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java @@ -21,6 +21,7 @@ import static io.netty.handler.timeout.IdleState.READER_IDLE; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Charsets; +import com.google.common.base.Throwables; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -79,6 +80,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto; @@ -111,7 +113,7 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { @VisibleForTesting static final String DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY = - "dfs.encrypt.data.transfer.cipher.suites"; + "dfs.encrypt.data.transfer.cipher.suites"; @VisibleForTesting static final String AES_CTR_NOPADDING = "AES/CTR/NoPadding"; @@ -129,7 +131,7 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { private static final SaslAdaptor SASL_ADAPTOR; - private interface CipherHelper { + private interface CipherOptionHelper { List getCipherOptions(Configuration conf) throws IOException; @@ -150,9 +152,19 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { byte[] getOutIv(Object cipherOption); } - private static final CipherHelper CIPHER_HELPER; + private static final CipherOptionHelper CIPHER_OPTION_HELPER; - private static final class CryptoCodec { + private interface TransparentCryptoHelper { + + Object getFileEncryptionInfo(HdfsFileStatus stat); + + CryptoCodec createCryptoCodec(Configuration conf, Object feInfo, DFSClient client) + throws IOException; + } + + private static final TransparentCryptoHelper TRANSPARENT_CRYPTO_HELPER; + + static final class CryptoCodec { private static final Method CREATE_CODEC; @@ -215,23 +227,34 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { private final Object decryptor; public CryptoCodec(Configuration conf, Object cipherOption) { - Object codec; try { - codec = CREATE_CODEC.invoke(null, conf, CIPHER_HELPER.getCipherSuite(cipherOption)); + Object codec = CREATE_CODEC.invoke(null, conf, + CIPHER_OPTION_HELPER.getCipherSuite(cipherOption)); encryptor = CREATE_ENCRYPTOR.invoke(codec); - byte[] encKey = CIPHER_HELPER.getInKey(cipherOption); - byte[] encIv = CIPHER_HELPER.getInIv(cipherOption); + byte[] encKey = CIPHER_OPTION_HELPER.getInKey(cipherOption); + byte[] encIv = CIPHER_OPTION_HELPER.getInIv(cipherOption); INIT_ENCRYPTOR.invoke(encryptor, encKey, Arrays.copyOf(encIv, encIv.length)); decryptor = CREATE_DECRYPTOR.invoke(codec); - byte[] decKey = CIPHER_HELPER.getOutKey(cipherOption); - byte[] decIv = CIPHER_HELPER.getOutIv(cipherOption); + byte[] decKey = CIPHER_OPTION_HELPER.getOutKey(cipherOption); + byte[] decIv = CIPHER_OPTION_HELPER.getOutIv(cipherOption); INIT_DECRYPTOR.invoke(decryptor, decKey, Arrays.copyOf(decIv, decIv.length)); } catch (IllegalAccessException | InvocationTargetException e) { throw new RuntimeException(e); } } + public CryptoCodec(Configuration conf, Object cipherSuite, byte[] encKey, byte[] encIv) { + try { + Object codec = CREATE_CODEC.invoke(null, conf, cipherSuite); + encryptor = CREATE_ENCRYPTOR.invoke(codec); + INIT_ENCRYPTOR.invoke(encryptor, encKey, encIv); + decryptor = null; + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + public void encrypt(ByteBuffer inBuffer, ByteBuffer outBuffer) { try { ENCRYPT.invoke(encryptor, inBuffer, outBuffer); @@ -251,17 +274,17 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { private static SaslAdaptor createSaslAdaptor27(Class saslDataTransferClientClass) throws NoSuchFieldException, NoSuchMethodException { - final Field saslPropsResolverField = - saslDataTransferClientClass.getDeclaredField("saslPropsResolver"); + final Field saslPropsResolverField = saslDataTransferClientClass + .getDeclaredField("saslPropsResolver"); saslPropsResolverField.setAccessible(true); - final Field trustedChannelResolverField = - saslDataTransferClientClass.getDeclaredField("trustedChannelResolver"); + final Field trustedChannelResolverField = saslDataTransferClientClass + .getDeclaredField("trustedChannelResolver"); trustedChannelResolverField.setAccessible(true); - final Field fallbackToSimpleAuthField = - saslDataTransferClientClass.getDeclaredField("fallbackToSimpleAuth"); + final Field fallbackToSimpleAuthField = saslDataTransferClientClass + .getDeclaredField("fallbackToSimpleAuth"); fallbackToSimpleAuthField.setAccessible(true); - final Method getSaslDataTransferClientMethod = - DFSClient.class.getMethod("getSaslDataTransferClient"); + final Method getSaslDataTransferClientMethod = DFSClient.class + .getMethod("getSaslDataTransferClient"); final Method newDataEncryptionKeyMethod = DFSClient.class.getMethod("newDataEncryptionKey"); return new SaslAdaptor() { @@ -288,8 +311,8 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { @Override public AtomicBoolean getFallbackToSimpleAuth(DFSClient client) { try { - return (AtomicBoolean) fallbackToSimpleAuthField.get(getSaslDataTransferClientMethod - .invoke(client)); + return (AtomicBoolean) fallbackToSimpleAuthField + .get(getSaslDataTransferClientMethod.invoke(client)); } catch (IllegalAccessException | InvocationTargetException e) { throw new RuntimeException(e); } @@ -308,8 +331,8 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { private static SaslAdaptor createSaslAdaptor25() { try { - final Field trustedChannelResolverField = - DFSClient.class.getDeclaredField("trustedChannelResolver"); + final Field trustedChannelResolverField = DFSClient.class + .getDeclaredField("trustedChannelResolver"); trustedChannelResolverField.setAccessible(true); final Method getDataEncryptionKeyMethod = DFSClient.class.getMethod("getDataEncryptionKey"); return new SaslAdaptor() { @@ -351,8 +374,8 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { private static SaslAdaptor createSaslAdaptor() { Class saslDataTransferClientClass = null; try { - saslDataTransferClientClass = - Class.forName("org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient"); + saslDataTransferClientClass = Class + .forName("org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient"); } catch (ClassNotFoundException e) { LOG.warn("No SaslDataTransferClient class found, should be hadoop 2.5-"); } @@ -364,8 +387,8 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { } } - private static CipherHelper createCipherHelper25() { - return new CipherHelper() { + private static CipherOptionHelper createCipherHelper25() { + return new CipherOptionHelper() { @Override public byte[] getOutKey(Object cipherOption) { @@ -410,18 +433,17 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { }; } - private static CipherHelper createCipherHelper27(Class cipherOptionClass) + private static CipherOptionHelper createCipherHelper27(Class cipherOptionClass) throws ClassNotFoundException, NoSuchMethodException { @SuppressWarnings("rawtypes") - Class cipherSuiteClass = - Class.forName("org.apache.hadoop.crypto.CipherSuite").asSubclass(Enum.class); + Class cipherSuiteClass = Class.forName("org.apache.hadoop.crypto.CipherSuite") + .asSubclass(Enum.class); @SuppressWarnings("unchecked") final Enum aesCipherSuite = Enum.valueOf(cipherSuiteClass, "AES_CTR_NOPADDING"); - final Constructor cipherOptionConstructor = - cipherOptionClass.getConstructor(cipherSuiteClass); - final Constructor cipherOptionWithKeyAndIvConstructor = - cipherOptionClass.getConstructor(cipherSuiteClass, byte[].class, byte[].class, - byte[].class, byte[].class); + final Constructor cipherOptionConstructor = cipherOptionClass + .getConstructor(cipherSuiteClass); + final Constructor cipherOptionWithKeyAndIvConstructor = cipherOptionClass + .getConstructor(cipherSuiteClass, byte[].class, byte[].class, byte[].class, byte[].class); final Method getCipherSuiteMethod = cipherOptionClass.getMethod("getCipherSuite"); final Method getInKeyMethod = cipherOptionClass.getMethod("getInKey"); @@ -429,16 +451,15 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { final Method getOutKeyMethod = cipherOptionClass.getMethod("getOutKey"); final Method getOutIvMethod = cipherOptionClass.getMethod("getOutIv"); - final Method convertCipherOptionsMethod = - PBHelper.class.getMethod("convertCipherOptions", List.class); - final Method convertCipherOptionProtosMethod = - PBHelper.class.getMethod("convertCipherOptionProtos", List.class); - final Method addAllCipherOptionMethod = - DataTransferEncryptorMessageProto.Builder.class.getMethod("addAllCipherOption", - Iterable.class); - final Method getCipherOptionListMethod = - DataTransferEncryptorMessageProto.class.getMethod("getCipherOptionList"); - return new CipherHelper() { + final Method convertCipherOptionsMethod = PBHelper.class.getMethod("convertCipherOptions", + List.class); + final Method convertCipherOptionProtosMethod = PBHelper.class + .getMethod("convertCipherOptionProtos", List.class); + final Method addAllCipherOptionMethod = DataTransferEncryptorMessageProto.Builder.class + .getMethod("addAllCipherOption", Iterable.class); + final Method getCipherOptionListMethod = DataTransferEncryptorMessageProto.class + .getMethod("getCipherOptionList"); + return new CipherOptionHelper() { @Override public byte[] getOutKey(Object cipherOption) { @@ -532,9 +553,8 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { boolean isNegotiatedQopPrivacy, SaslClient saslClient) throws IOException { List cipherOptions; try { - cipherOptions = - (List) convertCipherOptionProtosMethod.invoke(null, - getCipherOptionListMethod.invoke(proto)); + cipherOptions = (List) convertCipherOptionProtosMethod.invoke(null, + getCipherOptionListMethod.invoke(proto)); } catch (IllegalAccessException | InvocationTargetException e) { throw new RuntimeException(e); } @@ -557,7 +577,7 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { }; } - private static CipherHelper createCipherHelper() { + private static CipherOptionHelper createCipherHelper() { Class cipherOptionClass; try { cipherOptionClass = Class.forName("org.apache.hadoop.crypto.CipherOption"); @@ -572,9 +592,79 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { } } + private static TransparentCryptoHelper createTransparentCryptoHelper25() { + return new TransparentCryptoHelper() { + + @Override + public Object getFileEncryptionInfo(HdfsFileStatus stat) { + return null; + } + + @Override + public CryptoCodec createCryptoCodec(Configuration conf, Object feInfo, DFSClient client) { + throw new UnsupportedOperationException(); + } + }; + } + + private static TransparentCryptoHelper createTransparentCryptoHelper27(Class feInfoClass) + throws NoSuchMethodException, ClassNotFoundException { + final Method getFileEncryptionInfoMethod = HdfsFileStatus.class + .getMethod("getFileEncryptionInfo"); + final Method decryptEncryptedDataEncryptionKeyMethod = DFSClient.class + .getDeclaredMethod("decryptEncryptedDataEncryptionKey", feInfoClass); + decryptEncryptedDataEncryptionKeyMethod.setAccessible(true); + final Method getCipherSuiteMethod = feInfoClass.getMethod("getCipherSuite"); + Class keyVersionClass = Class.forName("org.apache.hadoop.crypto.key.KeyProvider$KeyVersion"); + final Method getMaterialMethod = keyVersionClass.getMethod("getMaterial"); + final Method getIVMethod = feInfoClass.getMethod("getIV"); + return new TransparentCryptoHelper() { + + @Override + public Object getFileEncryptionInfo(HdfsFileStatus stat) { + try { + return getFileEncryptionInfoMethod.invoke(stat); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + + @Override + public CryptoCodec createCryptoCodec(Configuration conf, Object feInfo, DFSClient client) + throws IOException { + try { + Object decrypted = decryptEncryptedDataEncryptionKeyMethod.invoke(client, feInfo); + return new CryptoCodec(conf, getCipherSuiteMethod.invoke(feInfo), + (byte[]) getMaterialMethod.invoke(decrypted), (byte[]) getIVMethod.invoke(feInfo)); + } catch (InvocationTargetException e) { + Throwables.propagateIfPossible(e.getTargetException(), IOException.class); + throw new RuntimeException(e.getTargetException()); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } + } + }; + } + + private static TransparentCryptoHelper createTransparentCryptoHelper() { + Class feInfoClass; + try { + feInfoClass = Class.forName("org.apache.hadoop.fs.FileEncryptionInfo"); + } catch (ClassNotFoundException e) { + LOG.warn("No FileEncryptionInfo class found, should be hadoop 2.5-"); + return createTransparentCryptoHelper25(); + } + try { + return createTransparentCryptoHelper27(feInfoClass); + } catch (NoSuchMethodException | ClassNotFoundException e) { + throw new Error(e); + } + } + static { SASL_ADAPTOR = createSaslAdaptor(); - CIPHER_HELPER = createCipherHelper(); + CIPHER_OPTION_HELPER = createCipherHelper(); + TRANSPARENT_CRYPTO_HELPER = createTransparentCryptoHelper(); } /** @@ -643,9 +733,8 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { Map saslProps, int timeoutMs, Promise promise) throws SaslException { this.conf = conf; this.saslProps = saslProps; - this.saslClient = - Sasl.createSaslClient(new String[] { MECHANISM }, username, PROTOCOL, SERVER_NAME, - saslProps, new SaslClientCallbackHandler(username, password)); + this.saslClient = Sasl.createSaslClient(new String[] { MECHANISM }, username, PROTOCOL, + SERVER_NAME, saslProps, new SaslClientCallbackHandler(username, password)); this.timeoutMs = timeoutMs; this.promise = promise; } @@ -656,14 +745,14 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload, List options) throws IOException { - DataTransferEncryptorMessageProto.Builder builder = - DataTransferEncryptorMessageProto.newBuilder(); + DataTransferEncryptorMessageProto.Builder builder = DataTransferEncryptorMessageProto + .newBuilder(); builder.setStatus(DataTransferEncryptorStatus.SUCCESS); if (payload != null) { builder.setPayload(ByteString.copyFrom(payload)); } if (options != null) { - CIPHER_HELPER.addCipherOptions(builder, options); + CIPHER_OPTION_HELPER.addCipherOptions(builder, options); } DataTransferEncryptorMessageProto proto = builder.build(); int size = proto.getSerializedSize(); @@ -704,8 +793,8 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { } private boolean requestedQopContainsPrivacy() { - Set requestedQop = - ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(","))); + Set requestedQop = ImmutableSet + .copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(","))); return requestedQop.contains("auth-conf"); } @@ -713,15 +802,16 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { if (!saslClient.isComplete()) { throw new IOException("Failed to complete SASL handshake"); } - Set requestedQop = - ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(","))); + Set requestedQop = ImmutableSet + .copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(","))); String negotiatedQop = getNegotiatedQop(); - LOG.debug("Verifying QOP, requested QOP = " + requestedQop + ", negotiated QOP = " - + negotiatedQop); + LOG.debug( + "Verifying QOP, requested QOP = " + requestedQop + ", negotiated QOP = " + negotiatedQop); if (!requestedQop.contains(negotiatedQop)) { throw new IOException(String.format("SASL handshake completed, but " + "channel does not have acceptable quality of protection, " - + "requested = %s, negotiated = %s", requestedQop, negotiatedQop)); + + "requested = %s, negotiated = %s", + requestedQop, negotiatedQop)); } } @@ -741,7 +831,7 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { case 1: { List cipherOptions = null; if (requestedQopContainsPrivacy()) { - cipherOptions = CIPHER_HELPER.getCipherOptions(conf); + cipherOptions = CIPHER_OPTION_HELPER.getCipherOptions(conf); } sendSaslMessage(ctx, response, cipherOptions); ctx.flush(); @@ -752,7 +842,7 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { assert response == null; checkSaslComplete(); Object cipherOption = - CIPHER_HELPER.getCipherOption(proto, isNegotiatedQopPrivacy(), saslClient); + CIPHER_OPTION_HELPER.getCipherOption(proto, isNegotiatedQopPrivacy(), saslClient); ChannelPipeline p = ctx.pipeline(); while (p.first() != null) { p.removeFirst(); @@ -762,8 +852,9 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { p.addLast(new EncryptHandler(codec), new DecryptHandler(codec)); } else { if (useWrap()) { - p.addLast(new SaslWrapHandler(saslClient), new LengthFieldBasedFrameDecoder( - Integer.MAX_VALUE, 0, 4), new SaslUnwrapHandler(saslClient)); + p.addLast(new SaslWrapHandler(saslClient), + new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4), + new SaslUnwrapHandler(saslClient)); } } promise.trySuccess(null); @@ -992,8 +1083,8 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { } if (encryptionKey != null) { if (LOG.isDebugEnabled()) { - LOG.debug("SASL client doing encrypted handshake for addr = " + addr + ", datanodeId = " - + dnInfo); + LOG.debug( + "SASL client doing encrypted handshake for addr = " + addr + ", datanodeId = " + dnInfo); } doSaslNegotiation(conf, channel, timeoutMs, getUserNameFromEncryptionKey(encryptionKey), encryptionKeyToPassword(encryptionKey.encryptionKey), @@ -1018,8 +1109,8 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { saslPromise.trySuccess(null); } else if (saslPropsResolver != null) { if (LOG.isDebugEnabled()) { - LOG.debug("SASL client doing general handshake for addr = " + addr + ", datanodeId = " - + dnInfo); + LOG.debug( + "SASL client doing general handshake for addr = " + addr + ", datanodeId = " + dnInfo); } doSaslNegotiation(conf, channel, timeoutMs, buildUsername(accessToken), buildClientPassword(accessToken), saslPropsResolver.getClientProperties(addr), saslPromise); @@ -1035,4 +1126,12 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { } } + static CryptoCodec createCryptoCodec(Configuration conf, HdfsFileStatus stat, DFSClient client) + throws IOException { + Object feInfo = TRANSPARENT_CRYPTO_HELPER.getFileEncryptionInfo(stat); + if (feInfo == null) { + return null; + } + return TRANSPARENT_CRYPTO_HELPER.createCryptoCodec(conf, feInfo, client); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java index 545a39e4838..4637a0143c0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java @@ -29,6 +29,8 @@ import io.netty.channel.nio.NioEventLoopGroup; import java.io.File; import java.io.IOException; +import java.lang.reflect.Method; +import java.net.URI; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -42,7 +44,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.http.ssl.KeyStoreTestUtil; import org.apache.hadoop.hbase.security.HBaseKerberosUtils; import org.apache.hadoop.hbase.security.token.TestGenerateDelegationToken; -import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -63,7 +65,7 @@ import org.junit.runners.Parameterized.Parameter; import org.junit.runners.Parameterized.Parameters; @RunWith(Parameterized.class) -@Category({ MiscTests.class, MediumTests.class }) +@Category({ MiscTests.class, LargeTests.class }) public class TestSaslFanOutOneBlockAsyncDFSOutput { private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); @@ -74,8 +76,8 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput { private static int READ_TIMEOUT_MS = 200000; - private static final File KEYTAB_FILE = new File( - TEST_UTIL.getDataTestDir("keytab").toUri().getPath()); + private static final File KEYTAB_FILE = + new File(TEST_UTIL.getDataTestDir("keytab").toUri().getPath()); private static MiniKdc KDC; @@ -86,6 +88,11 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput { private static String PRINCIPAL; private static String HTTP_PRINCIPAL; + + private static String TEST_KEY_NAME = "test_key"; + + private static boolean TEST_TRANSPARENT_ENCRYPTION = true; + @Rule public TestName name = new TestName(); @@ -98,13 +105,20 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput { @Parameter(2) public String cipherSuite; - @Parameters(name = "{index}: protection={0}, encryption={1}, cipherSuite={2}") + @Parameter(3) + public boolean useTransparentEncryption; + + @Parameters( + name = "{index}: protection={0}, encryption={1}, cipherSuite={2}, transparent_enc={3}") public static Iterable data() { List params = new ArrayList<>(); for (String protection : Arrays.asList("authentication", "integrity", "privacy")) { for (String encryptionAlgorithm : Arrays.asList("", "3des", "rc4")) { for (String cipherSuite : Arrays.asList("", AES_CTR_NOPADDING)) { - params.add(new Object[] { protection, encryptionAlgorithm, cipherSuite }); + for (boolean useTransparentEncryption : Arrays.asList(false, true)) { + params.add(new Object[] { protection, encryptionAlgorithm, cipherSuite, + useTransparentEncryption }); + } } } } @@ -132,6 +146,35 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput { conf.setBoolean("ignore.secure.ports.for.testing", true); } + private static void setUpKeyProvider(Configuration conf) throws Exception { + Class keyProviderFactoryClass; + try { + keyProviderFactoryClass = Class.forName("org.apache.hadoop.crypto.key.KeyProviderFactory"); + } catch (ClassNotFoundException e) { + // should be hadoop 2.5-, give up + TEST_TRANSPARENT_ENCRYPTION = false; + return; + } + + URI keyProviderUri = + new URI("jceks://file" + TEST_UTIL.getDataTestDir("test.jks").toUri().toString()); + conf.set("dfs.encryption.key.provider.uri", keyProviderUri.toString()); + Method getKeyProviderMethod = + keyProviderFactoryClass.getMethod("get", URI.class, Configuration.class); + Object keyProvider = getKeyProviderMethod.invoke(null, keyProviderUri, conf); + Class keyProviderClass = Class.forName("org.apache.hadoop.crypto.key.KeyProvider"); + Class keyProviderOptionsClass = + Class.forName("org.apache.hadoop.crypto.key.KeyProvider$Options"); + Method createKeyMethod = + keyProviderClass.getMethod("createKey", String.class, keyProviderOptionsClass); + Object options = keyProviderOptionsClass.getConstructor(Configuration.class).newInstance(conf); + createKeyMethod.invoke(keyProvider, TEST_KEY_NAME, options); + Method flushMethod = keyProviderClass.getMethod("flush"); + flushMethod.invoke(keyProvider); + Method closeMethod = keyProviderClass.getMethod("close"); + closeMethod.invoke(keyProvider); + } + @BeforeClass public static void setUpBeforeClass() throws Exception { EVENT_LOOP_GROUP = new NioEventLoopGroup(); @@ -144,6 +187,8 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput { PRINCIPAL = USERNAME + "/" + HOST; HTTP_PRINCIPAL = "HTTP/" + HOST; KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL, HTTP_PRINCIPAL); + + setUpKeyProvider(TEST_UTIL.getConfiguration()); setHdfsSecuredConfiguration(TEST_UTIL.getConfiguration()); HBaseKerberosUtils.setKeytabFileForTesting(KEYTAB_FILE.getAbsolutePath()); HBaseKerberosUtils.setPrincipalForTesting(PRINCIPAL + "@" + KDC.getRealm()); @@ -161,6 +206,17 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput { } } + private Path testDirOnTestFs; + + private void createEncryptionZone() throws Exception { + if (!TEST_TRANSPARENT_ENCRYPTION) { + return; + } + Method method = + DistributedFileSystem.class.getMethod("createEncryptionZone", Path.class, String.class); + method.invoke(FS, testDirOnTestFs, TEST_KEY_NAME); + } + @Before public void setUp() throws Exception { TEST_UTIL.getConfiguration().set("dfs.data.transfer.protection", protection); @@ -182,6 +238,11 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput { TEST_UTIL.startMiniDFSCluster(3); FS = TEST_UTIL.getDFSCluster().getFileSystem(); + testDirOnTestFs = new Path("/" + name.getMethodName().replaceAll("[^0-9a-zA-Z]", "_")); + FS.mkdirs(testDirOnTestFs); + if (useTransparentEncryption) { + createEncryptionZone(); + } } @After @@ -190,7 +251,7 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput { } private Path getTestFile() { - return new Path("/" + name.getMethodName().replaceAll("[^0-9a-zA-Z]", "_")); + return new Path(testDirOnTestFs, "test"); } @Test