HBASE-15743 Add Transparent Data Encryption support for FanOutOneBlockAsyncDFSOutput
This commit is contained in:
parent
532b914f08
commit
0d252918f6
|
@ -43,6 +43,7 @@ import io.netty.util.concurrent.FutureListener;
|
|||
import io.netty.util.concurrent.Promise;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.CompletionHandler;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.Collection;
|
||||
|
@ -58,6 +59,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose;
|
||||
import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.CryptoCodec;
|
||||
import org.apache.hadoop.hbase.util.CancelableProgressable;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hdfs.DFSClient;
|
||||
|
@ -119,6 +121,8 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
|
|||
|
||||
private final LocatedBlock locatedBlock;
|
||||
|
||||
private final CryptoCodec cryptoCodec;
|
||||
|
||||
private final EventLoop eventLoop;
|
||||
|
||||
private final List<Channel> datanodeList;
|
||||
|
@ -317,8 +321,8 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
|
|||
|
||||
FanOutOneBlockAsyncDFSOutput(Configuration conf, FSUtils fsUtils, DistributedFileSystem dfs,
|
||||
DFSClient client, ClientProtocol namenode, String clientName, String src, long fileId,
|
||||
LocatedBlock locatedBlock, EventLoop eventLoop, List<Channel> datanodeList,
|
||||
DataChecksum summer, ByteBufAllocator alloc) {
|
||||
LocatedBlock locatedBlock, CryptoCodec cryptoCodec, EventLoop eventLoop,
|
||||
List<Channel> datanodeList, DataChecksum summer, ByteBufAllocator alloc) {
|
||||
this.conf = conf;
|
||||
this.fsUtils = fsUtils;
|
||||
this.dfs = dfs;
|
||||
|
@ -328,6 +332,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
|
|||
this.clientName = clientName;
|
||||
this.src = src;
|
||||
this.locatedBlock = locatedBlock;
|
||||
this.cryptoCodec = cryptoCodec;
|
||||
this.eventLoop = eventLoop;
|
||||
this.datanodeList = datanodeList;
|
||||
this.summer = summer;
|
||||
|
@ -342,16 +347,27 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
|
|||
write(b, 0, b.length);
|
||||
}
|
||||
|
||||
private void write0(byte[] b, final int off, final int len) {
|
||||
buf.ensureWritable(len);
|
||||
if (cryptoCodec == null) {
|
||||
buf.writeBytes(b, off, len);
|
||||
} else {
|
||||
ByteBuffer inBuffer = ByteBuffer.wrap(b, off, len);
|
||||
cryptoCodec.encrypt(inBuffer, buf.nioBuffer(buf.writerIndex(), len));
|
||||
buf.writerIndex(buf.writerIndex() + len);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(final byte[] b, final int off, final int len) {
|
||||
if (eventLoop.inEventLoop()) {
|
||||
buf.ensureWritable(len).writeBytes(b, off, len);
|
||||
write0(b, off, len);
|
||||
} else {
|
||||
eventLoop.submit(new Runnable() {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
buf.ensureWritable(len).writeBytes(b, off, len);
|
||||
write0(b, off, len);
|
||||
}
|
||||
}).syncUninterruptibly();
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ import static io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
|
|||
import static io.netty.handler.timeout.IdleState.READER_IDLE;
|
||||
import static org.apache.hadoop.fs.CreateFlag.CREATE;
|
||||
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
|
||||
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createCryptoCodec;
|
||||
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
|
||||
|
@ -73,6 +74,7 @@ import org.apache.hadoop.fs.UnresolvedLinkException;
|
|||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.ConnectionUtils;
|
||||
import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.CryptoCodec;
|
||||
import org.apache.hadoop.hbase.util.CancelableProgressable;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hdfs.DFSClient;
|
||||
|
@ -672,9 +674,12 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
|
|||
// layer should retry itself if needed.
|
||||
datanodeList.add(future.syncUninterruptibly().getNow());
|
||||
}
|
||||
CryptoCodec cryptocodec = createCryptoCodec(conf, stat, client);
|
||||
FanOutOneBlockAsyncDFSOutput output = new FanOutOneBlockAsyncDFSOutput(conf, fsUtils, dfs,
|
||||
client, namenode, clientName, src, stat.getFileId(), locatedBlock, cryptocodec, eventLoop,
|
||||
datanodeList, summer, ALLOC);
|
||||
succ = true;
|
||||
return new FanOutOneBlockAsyncDFSOutput(conf, fsUtils, dfs, client, namenode, clientName,
|
||||
src, stat.getFileId(), locatedBlock, eventLoop, datanodeList, summer, ALLOC);
|
||||
return output;
|
||||
} finally {
|
||||
if (!succ) {
|
||||
if (futureList != null) {
|
||||
|
|
|
@ -21,6 +21,7 @@ import static io.netty.handler.timeout.IdleState.READER_IDLE;
|
|||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
|
@ -79,6 +80,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hdfs.DFSClient;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto;
|
||||
|
@ -111,7 +113,7 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
|
|||
|
||||
@VisibleForTesting
|
||||
static final String DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY =
|
||||
"dfs.encrypt.data.transfer.cipher.suites";
|
||||
"dfs.encrypt.data.transfer.cipher.suites";
|
||||
|
||||
@VisibleForTesting
|
||||
static final String AES_CTR_NOPADDING = "AES/CTR/NoPadding";
|
||||
|
@ -129,7 +131,7 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
|
|||
|
||||
private static final SaslAdaptor SASL_ADAPTOR;
|
||||
|
||||
private interface CipherHelper {
|
||||
private interface CipherOptionHelper {
|
||||
|
||||
List<Object> getCipherOptions(Configuration conf) throws IOException;
|
||||
|
||||
|
@ -150,9 +152,19 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
|
|||
byte[] getOutIv(Object cipherOption);
|
||||
}
|
||||
|
||||
private static final CipherHelper CIPHER_HELPER;
|
||||
private static final CipherOptionHelper CIPHER_OPTION_HELPER;
|
||||
|
||||
private static final class CryptoCodec {
|
||||
private interface TransparentCryptoHelper {
|
||||
|
||||
Object getFileEncryptionInfo(HdfsFileStatus stat);
|
||||
|
||||
CryptoCodec createCryptoCodec(Configuration conf, Object feInfo, DFSClient client)
|
||||
throws IOException;
|
||||
}
|
||||
|
||||
private static final TransparentCryptoHelper TRANSPARENT_CRYPTO_HELPER;
|
||||
|
||||
static final class CryptoCodec {
|
||||
|
||||
private static final Method CREATE_CODEC;
|
||||
|
||||
|
@ -215,23 +227,34 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
|
|||
private final Object decryptor;
|
||||
|
||||
public CryptoCodec(Configuration conf, Object cipherOption) {
|
||||
Object codec;
|
||||
try {
|
||||
codec = CREATE_CODEC.invoke(null, conf, CIPHER_HELPER.getCipherSuite(cipherOption));
|
||||
Object codec = CREATE_CODEC.invoke(null, conf,
|
||||
CIPHER_OPTION_HELPER.getCipherSuite(cipherOption));
|
||||
encryptor = CREATE_ENCRYPTOR.invoke(codec);
|
||||
byte[] encKey = CIPHER_HELPER.getInKey(cipherOption);
|
||||
byte[] encIv = CIPHER_HELPER.getInIv(cipherOption);
|
||||
byte[] encKey = CIPHER_OPTION_HELPER.getInKey(cipherOption);
|
||||
byte[] encIv = CIPHER_OPTION_HELPER.getInIv(cipherOption);
|
||||
INIT_ENCRYPTOR.invoke(encryptor, encKey, Arrays.copyOf(encIv, encIv.length));
|
||||
|
||||
decryptor = CREATE_DECRYPTOR.invoke(codec);
|
||||
byte[] decKey = CIPHER_HELPER.getOutKey(cipherOption);
|
||||
byte[] decIv = CIPHER_HELPER.getOutIv(cipherOption);
|
||||
byte[] decKey = CIPHER_OPTION_HELPER.getOutKey(cipherOption);
|
||||
byte[] decIv = CIPHER_OPTION_HELPER.getOutIv(cipherOption);
|
||||
INIT_DECRYPTOR.invoke(decryptor, decKey, Arrays.copyOf(decIv, decIv.length));
|
||||
} catch (IllegalAccessException | InvocationTargetException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public CryptoCodec(Configuration conf, Object cipherSuite, byte[] encKey, byte[] encIv) {
|
||||
try {
|
||||
Object codec = CREATE_CODEC.invoke(null, conf, cipherSuite);
|
||||
encryptor = CREATE_ENCRYPTOR.invoke(codec);
|
||||
INIT_ENCRYPTOR.invoke(encryptor, encKey, encIv);
|
||||
decryptor = null;
|
||||
} catch (IllegalAccessException | InvocationTargetException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public void encrypt(ByteBuffer inBuffer, ByteBuffer outBuffer) {
|
||||
try {
|
||||
ENCRYPT.invoke(encryptor, inBuffer, outBuffer);
|
||||
|
@ -251,17 +274,17 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
|
|||
|
||||
private static SaslAdaptor createSaslAdaptor27(Class<?> saslDataTransferClientClass)
|
||||
throws NoSuchFieldException, NoSuchMethodException {
|
||||
final Field saslPropsResolverField =
|
||||
saslDataTransferClientClass.getDeclaredField("saslPropsResolver");
|
||||
final Field saslPropsResolverField = saslDataTransferClientClass
|
||||
.getDeclaredField("saslPropsResolver");
|
||||
saslPropsResolverField.setAccessible(true);
|
||||
final Field trustedChannelResolverField =
|
||||
saslDataTransferClientClass.getDeclaredField("trustedChannelResolver");
|
||||
final Field trustedChannelResolverField = saslDataTransferClientClass
|
||||
.getDeclaredField("trustedChannelResolver");
|
||||
trustedChannelResolverField.setAccessible(true);
|
||||
final Field fallbackToSimpleAuthField =
|
||||
saslDataTransferClientClass.getDeclaredField("fallbackToSimpleAuth");
|
||||
final Field fallbackToSimpleAuthField = saslDataTransferClientClass
|
||||
.getDeclaredField("fallbackToSimpleAuth");
|
||||
fallbackToSimpleAuthField.setAccessible(true);
|
||||
final Method getSaslDataTransferClientMethod =
|
||||
DFSClient.class.getMethod("getSaslDataTransferClient");
|
||||
final Method getSaslDataTransferClientMethod = DFSClient.class
|
||||
.getMethod("getSaslDataTransferClient");
|
||||
final Method newDataEncryptionKeyMethod = DFSClient.class.getMethod("newDataEncryptionKey");
|
||||
return new SaslAdaptor() {
|
||||
|
||||
|
@ -288,8 +311,8 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
|
|||
@Override
|
||||
public AtomicBoolean getFallbackToSimpleAuth(DFSClient client) {
|
||||
try {
|
||||
return (AtomicBoolean) fallbackToSimpleAuthField.get(getSaslDataTransferClientMethod
|
||||
.invoke(client));
|
||||
return (AtomicBoolean) fallbackToSimpleAuthField
|
||||
.get(getSaslDataTransferClientMethod.invoke(client));
|
||||
} catch (IllegalAccessException | InvocationTargetException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
@ -308,8 +331,8 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
|
|||
|
||||
private static SaslAdaptor createSaslAdaptor25() {
|
||||
try {
|
||||
final Field trustedChannelResolverField =
|
||||
DFSClient.class.getDeclaredField("trustedChannelResolver");
|
||||
final Field trustedChannelResolverField = DFSClient.class
|
||||
.getDeclaredField("trustedChannelResolver");
|
||||
trustedChannelResolverField.setAccessible(true);
|
||||
final Method getDataEncryptionKeyMethod = DFSClient.class.getMethod("getDataEncryptionKey");
|
||||
return new SaslAdaptor() {
|
||||
|
@ -351,8 +374,8 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
|
|||
private static SaslAdaptor createSaslAdaptor() {
|
||||
Class<?> saslDataTransferClientClass = null;
|
||||
try {
|
||||
saslDataTransferClientClass =
|
||||
Class.forName("org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient");
|
||||
saslDataTransferClientClass = Class
|
||||
.forName("org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient");
|
||||
} catch (ClassNotFoundException e) {
|
||||
LOG.warn("No SaslDataTransferClient class found, should be hadoop 2.5-");
|
||||
}
|
||||
|
@ -364,8 +387,8 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
|
|||
}
|
||||
}
|
||||
|
||||
private static CipherHelper createCipherHelper25() {
|
||||
return new CipherHelper() {
|
||||
private static CipherOptionHelper createCipherHelper25() {
|
||||
return new CipherOptionHelper() {
|
||||
|
||||
@Override
|
||||
public byte[] getOutKey(Object cipherOption) {
|
||||
|
@ -410,18 +433,17 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
|
|||
};
|
||||
}
|
||||
|
||||
private static CipherHelper createCipherHelper27(Class<?> cipherOptionClass)
|
||||
private static CipherOptionHelper createCipherHelper27(Class<?> cipherOptionClass)
|
||||
throws ClassNotFoundException, NoSuchMethodException {
|
||||
@SuppressWarnings("rawtypes")
|
||||
Class<? extends Enum> cipherSuiteClass =
|
||||
Class.forName("org.apache.hadoop.crypto.CipherSuite").asSubclass(Enum.class);
|
||||
Class<? extends Enum> cipherSuiteClass = Class.forName("org.apache.hadoop.crypto.CipherSuite")
|
||||
.asSubclass(Enum.class);
|
||||
@SuppressWarnings("unchecked")
|
||||
final Enum<?> aesCipherSuite = Enum.valueOf(cipherSuiteClass, "AES_CTR_NOPADDING");
|
||||
final Constructor<?> cipherOptionConstructor =
|
||||
cipherOptionClass.getConstructor(cipherSuiteClass);
|
||||
final Constructor<?> cipherOptionWithKeyAndIvConstructor =
|
||||
cipherOptionClass.getConstructor(cipherSuiteClass, byte[].class, byte[].class,
|
||||
byte[].class, byte[].class);
|
||||
final Constructor<?> cipherOptionConstructor = cipherOptionClass
|
||||
.getConstructor(cipherSuiteClass);
|
||||
final Constructor<?> cipherOptionWithKeyAndIvConstructor = cipherOptionClass
|
||||
.getConstructor(cipherSuiteClass, byte[].class, byte[].class, byte[].class, byte[].class);
|
||||
|
||||
final Method getCipherSuiteMethod = cipherOptionClass.getMethod("getCipherSuite");
|
||||
final Method getInKeyMethod = cipherOptionClass.getMethod("getInKey");
|
||||
|
@ -429,16 +451,15 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
|
|||
final Method getOutKeyMethod = cipherOptionClass.getMethod("getOutKey");
|
||||
final Method getOutIvMethod = cipherOptionClass.getMethod("getOutIv");
|
||||
|
||||
final Method convertCipherOptionsMethod =
|
||||
PBHelper.class.getMethod("convertCipherOptions", List.class);
|
||||
final Method convertCipherOptionProtosMethod =
|
||||
PBHelper.class.getMethod("convertCipherOptionProtos", List.class);
|
||||
final Method addAllCipherOptionMethod =
|
||||
DataTransferEncryptorMessageProto.Builder.class.getMethod("addAllCipherOption",
|
||||
Iterable.class);
|
||||
final Method getCipherOptionListMethod =
|
||||
DataTransferEncryptorMessageProto.class.getMethod("getCipherOptionList");
|
||||
return new CipherHelper() {
|
||||
final Method convertCipherOptionsMethod = PBHelper.class.getMethod("convertCipherOptions",
|
||||
List.class);
|
||||
final Method convertCipherOptionProtosMethod = PBHelper.class
|
||||
.getMethod("convertCipherOptionProtos", List.class);
|
||||
final Method addAllCipherOptionMethod = DataTransferEncryptorMessageProto.Builder.class
|
||||
.getMethod("addAllCipherOption", Iterable.class);
|
||||
final Method getCipherOptionListMethod = DataTransferEncryptorMessageProto.class
|
||||
.getMethod("getCipherOptionList");
|
||||
return new CipherOptionHelper() {
|
||||
|
||||
@Override
|
||||
public byte[] getOutKey(Object cipherOption) {
|
||||
|
@ -532,9 +553,8 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
|
|||
boolean isNegotiatedQopPrivacy, SaslClient saslClient) throws IOException {
|
||||
List<Object> cipherOptions;
|
||||
try {
|
||||
cipherOptions =
|
||||
(List<Object>) convertCipherOptionProtosMethod.invoke(null,
|
||||
getCipherOptionListMethod.invoke(proto));
|
||||
cipherOptions = (List<Object>) convertCipherOptionProtosMethod.invoke(null,
|
||||
getCipherOptionListMethod.invoke(proto));
|
||||
} catch (IllegalAccessException | InvocationTargetException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
@ -557,7 +577,7 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
|
|||
};
|
||||
}
|
||||
|
||||
private static CipherHelper createCipherHelper() {
|
||||
private static CipherOptionHelper createCipherHelper() {
|
||||
Class<?> cipherOptionClass;
|
||||
try {
|
||||
cipherOptionClass = Class.forName("org.apache.hadoop.crypto.CipherOption");
|
||||
|
@ -572,9 +592,79 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
|
|||
}
|
||||
}
|
||||
|
||||
private static TransparentCryptoHelper createTransparentCryptoHelper25() {
|
||||
return new TransparentCryptoHelper() {
|
||||
|
||||
@Override
|
||||
public Object getFileEncryptionInfo(HdfsFileStatus stat) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CryptoCodec createCryptoCodec(Configuration conf, Object feInfo, DFSClient client) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static TransparentCryptoHelper createTransparentCryptoHelper27(Class<?> feInfoClass)
|
||||
throws NoSuchMethodException, ClassNotFoundException {
|
||||
final Method getFileEncryptionInfoMethod = HdfsFileStatus.class
|
||||
.getMethod("getFileEncryptionInfo");
|
||||
final Method decryptEncryptedDataEncryptionKeyMethod = DFSClient.class
|
||||
.getDeclaredMethod("decryptEncryptedDataEncryptionKey", feInfoClass);
|
||||
decryptEncryptedDataEncryptionKeyMethod.setAccessible(true);
|
||||
final Method getCipherSuiteMethod = feInfoClass.getMethod("getCipherSuite");
|
||||
Class<?> keyVersionClass = Class.forName("org.apache.hadoop.crypto.key.KeyProvider$KeyVersion");
|
||||
final Method getMaterialMethod = keyVersionClass.getMethod("getMaterial");
|
||||
final Method getIVMethod = feInfoClass.getMethod("getIV");
|
||||
return new TransparentCryptoHelper() {
|
||||
|
||||
@Override
|
||||
public Object getFileEncryptionInfo(HdfsFileStatus stat) {
|
||||
try {
|
||||
return getFileEncryptionInfoMethod.invoke(stat);
|
||||
} catch (IllegalAccessException | InvocationTargetException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public CryptoCodec createCryptoCodec(Configuration conf, Object feInfo, DFSClient client)
|
||||
throws IOException {
|
||||
try {
|
||||
Object decrypted = decryptEncryptedDataEncryptionKeyMethod.invoke(client, feInfo);
|
||||
return new CryptoCodec(conf, getCipherSuiteMethod.invoke(feInfo),
|
||||
(byte[]) getMaterialMethod.invoke(decrypted), (byte[]) getIVMethod.invoke(feInfo));
|
||||
} catch (InvocationTargetException e) {
|
||||
Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
|
||||
throw new RuntimeException(e.getTargetException());
|
||||
} catch (IllegalAccessException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static TransparentCryptoHelper createTransparentCryptoHelper() {
|
||||
Class<?> feInfoClass;
|
||||
try {
|
||||
feInfoClass = Class.forName("org.apache.hadoop.fs.FileEncryptionInfo");
|
||||
} catch (ClassNotFoundException e) {
|
||||
LOG.warn("No FileEncryptionInfo class found, should be hadoop 2.5-");
|
||||
return createTransparentCryptoHelper25();
|
||||
}
|
||||
try {
|
||||
return createTransparentCryptoHelper27(feInfoClass);
|
||||
} catch (NoSuchMethodException | ClassNotFoundException e) {
|
||||
throw new Error(e);
|
||||
}
|
||||
}
|
||||
|
||||
static {
|
||||
SASL_ADAPTOR = createSaslAdaptor();
|
||||
CIPHER_HELPER = createCipherHelper();
|
||||
CIPHER_OPTION_HELPER = createCipherHelper();
|
||||
TRANSPARENT_CRYPTO_HELPER = createTransparentCryptoHelper();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -643,9 +733,8 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
|
|||
Map<String, String> saslProps, int timeoutMs, Promise<Void> promise) throws SaslException {
|
||||
this.conf = conf;
|
||||
this.saslProps = saslProps;
|
||||
this.saslClient =
|
||||
Sasl.createSaslClient(new String[] { MECHANISM }, username, PROTOCOL, SERVER_NAME,
|
||||
saslProps, new SaslClientCallbackHandler(username, password));
|
||||
this.saslClient = Sasl.createSaslClient(new String[] { MECHANISM }, username, PROTOCOL,
|
||||
SERVER_NAME, saslProps, new SaslClientCallbackHandler(username, password));
|
||||
this.timeoutMs = timeoutMs;
|
||||
this.promise = promise;
|
||||
}
|
||||
|
@ -656,14 +745,14 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
|
|||
|
||||
private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload, List<Object> options)
|
||||
throws IOException {
|
||||
DataTransferEncryptorMessageProto.Builder builder =
|
||||
DataTransferEncryptorMessageProto.newBuilder();
|
||||
DataTransferEncryptorMessageProto.Builder builder = DataTransferEncryptorMessageProto
|
||||
.newBuilder();
|
||||
builder.setStatus(DataTransferEncryptorStatus.SUCCESS);
|
||||
if (payload != null) {
|
||||
builder.setPayload(ByteString.copyFrom(payload));
|
||||
}
|
||||
if (options != null) {
|
||||
CIPHER_HELPER.addCipherOptions(builder, options);
|
||||
CIPHER_OPTION_HELPER.addCipherOptions(builder, options);
|
||||
}
|
||||
DataTransferEncryptorMessageProto proto = builder.build();
|
||||
int size = proto.getSerializedSize();
|
||||
|
@ -704,8 +793,8 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
|
|||
}
|
||||
|
||||
private boolean requestedQopContainsPrivacy() {
|
||||
Set<String> requestedQop =
|
||||
ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(",")));
|
||||
Set<String> requestedQop = ImmutableSet
|
||||
.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(",")));
|
||||
return requestedQop.contains("auth-conf");
|
||||
}
|
||||
|
||||
|
@ -713,15 +802,16 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
|
|||
if (!saslClient.isComplete()) {
|
||||
throw new IOException("Failed to complete SASL handshake");
|
||||
}
|
||||
Set<String> requestedQop =
|
||||
ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(",")));
|
||||
Set<String> requestedQop = ImmutableSet
|
||||
.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(",")));
|
||||
String negotiatedQop = getNegotiatedQop();
|
||||
LOG.debug("Verifying QOP, requested QOP = " + requestedQop + ", negotiated QOP = "
|
||||
+ negotiatedQop);
|
||||
LOG.debug(
|
||||
"Verifying QOP, requested QOP = " + requestedQop + ", negotiated QOP = " + negotiatedQop);
|
||||
if (!requestedQop.contains(negotiatedQop)) {
|
||||
throw new IOException(String.format("SASL handshake completed, but "
|
||||
+ "channel does not have acceptable quality of protection, "
|
||||
+ "requested = %s, negotiated = %s", requestedQop, negotiatedQop));
|
||||
+ "requested = %s, negotiated = %s",
|
||||
requestedQop, negotiatedQop));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -741,7 +831,7 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
|
|||
case 1: {
|
||||
List<Object> cipherOptions = null;
|
||||
if (requestedQopContainsPrivacy()) {
|
||||
cipherOptions = CIPHER_HELPER.getCipherOptions(conf);
|
||||
cipherOptions = CIPHER_OPTION_HELPER.getCipherOptions(conf);
|
||||
}
|
||||
sendSaslMessage(ctx, response, cipherOptions);
|
||||
ctx.flush();
|
||||
|
@ -752,7 +842,7 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
|
|||
assert response == null;
|
||||
checkSaslComplete();
|
||||
Object cipherOption =
|
||||
CIPHER_HELPER.getCipherOption(proto, isNegotiatedQopPrivacy(), saslClient);
|
||||
CIPHER_OPTION_HELPER.getCipherOption(proto, isNegotiatedQopPrivacy(), saslClient);
|
||||
ChannelPipeline p = ctx.pipeline();
|
||||
while (p.first() != null) {
|
||||
p.removeFirst();
|
||||
|
@ -762,8 +852,9 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
|
|||
p.addLast(new EncryptHandler(codec), new DecryptHandler(codec));
|
||||
} else {
|
||||
if (useWrap()) {
|
||||
p.addLast(new SaslWrapHandler(saslClient), new LengthFieldBasedFrameDecoder(
|
||||
Integer.MAX_VALUE, 0, 4), new SaslUnwrapHandler(saslClient));
|
||||
p.addLast(new SaslWrapHandler(saslClient),
|
||||
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4),
|
||||
new SaslUnwrapHandler(saslClient));
|
||||
}
|
||||
}
|
||||
promise.trySuccess(null);
|
||||
|
@ -992,8 +1083,8 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
|
|||
}
|
||||
if (encryptionKey != null) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("SASL client doing encrypted handshake for addr = " + addr + ", datanodeId = "
|
||||
+ dnInfo);
|
||||
LOG.debug(
|
||||
"SASL client doing encrypted handshake for addr = " + addr + ", datanodeId = " + dnInfo);
|
||||
}
|
||||
doSaslNegotiation(conf, channel, timeoutMs, getUserNameFromEncryptionKey(encryptionKey),
|
||||
encryptionKeyToPassword(encryptionKey.encryptionKey),
|
||||
|
@ -1018,8 +1109,8 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
|
|||
saslPromise.trySuccess(null);
|
||||
} else if (saslPropsResolver != null) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("SASL client doing general handshake for addr = " + addr + ", datanodeId = "
|
||||
+ dnInfo);
|
||||
LOG.debug(
|
||||
"SASL client doing general handshake for addr = " + addr + ", datanodeId = " + dnInfo);
|
||||
}
|
||||
doSaslNegotiation(conf, channel, timeoutMs, buildUsername(accessToken),
|
||||
buildClientPassword(accessToken), saslPropsResolver.getClientProperties(addr), saslPromise);
|
||||
|
@ -1035,4 +1126,12 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
|
|||
}
|
||||
}
|
||||
|
||||
static CryptoCodec createCryptoCodec(Configuration conf, HdfsFileStatus stat, DFSClient client)
|
||||
throws IOException {
|
||||
Object feInfo = TRANSPARENT_CRYPTO_HELPER.getFileEncryptionInfo(stat);
|
||||
if (feInfo == null) {
|
||||
return null;
|
||||
}
|
||||
return TRANSPARENT_CRYPTO_HELPER.createCryptoCodec(conf, feInfo, client);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,6 +29,8 @@ import io.netty.channel.nio.NioEventLoopGroup;
|
|||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
@ -42,7 +44,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
|
|||
import org.apache.hadoop.hbase.http.ssl.KeyStoreTestUtil;
|
||||
import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
|
||||
import org.apache.hadoop.hbase.security.token.TestGenerateDelegationToken;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MiscTests;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
|
@ -63,7 +65,7 @@ import org.junit.runners.Parameterized.Parameter;
|
|||
import org.junit.runners.Parameterized.Parameters;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
@Category({ MiscTests.class, MediumTests.class })
|
||||
@Category({ MiscTests.class, LargeTests.class })
|
||||
public class TestSaslFanOutOneBlockAsyncDFSOutput {
|
||||
|
||||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
|
@ -74,8 +76,8 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
|
|||
|
||||
private static int READ_TIMEOUT_MS = 200000;
|
||||
|
||||
private static final File KEYTAB_FILE = new File(
|
||||
TEST_UTIL.getDataTestDir("keytab").toUri().getPath());
|
||||
private static final File KEYTAB_FILE =
|
||||
new File(TEST_UTIL.getDataTestDir("keytab").toUri().getPath());
|
||||
|
||||
private static MiniKdc KDC;
|
||||
|
||||
|
@ -86,6 +88,11 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
|
|||
private static String PRINCIPAL;
|
||||
|
||||
private static String HTTP_PRINCIPAL;
|
||||
|
||||
private static String TEST_KEY_NAME = "test_key";
|
||||
|
||||
private static boolean TEST_TRANSPARENT_ENCRYPTION = true;
|
||||
|
||||
@Rule
|
||||
public TestName name = new TestName();
|
||||
|
||||
|
@ -98,13 +105,20 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
|
|||
@Parameter(2)
|
||||
public String cipherSuite;
|
||||
|
||||
@Parameters(name = "{index}: protection={0}, encryption={1}, cipherSuite={2}")
|
||||
@Parameter(3)
|
||||
public boolean useTransparentEncryption;
|
||||
|
||||
@Parameters(
|
||||
name = "{index}: protection={0}, encryption={1}, cipherSuite={2}, transparent_enc={3}")
|
||||
public static Iterable<Object[]> data() {
|
||||
List<Object[]> params = new ArrayList<>();
|
||||
for (String protection : Arrays.asList("authentication", "integrity", "privacy")) {
|
||||
for (String encryptionAlgorithm : Arrays.asList("", "3des", "rc4")) {
|
||||
for (String cipherSuite : Arrays.asList("", AES_CTR_NOPADDING)) {
|
||||
params.add(new Object[] { protection, encryptionAlgorithm, cipherSuite });
|
||||
for (boolean useTransparentEncryption : Arrays.asList(false, true)) {
|
||||
params.add(new Object[] { protection, encryptionAlgorithm, cipherSuite,
|
||||
useTransparentEncryption });
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -132,6 +146,35 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
|
|||
conf.setBoolean("ignore.secure.ports.for.testing", true);
|
||||
}
|
||||
|
||||
private static void setUpKeyProvider(Configuration conf) throws Exception {
|
||||
Class<?> keyProviderFactoryClass;
|
||||
try {
|
||||
keyProviderFactoryClass = Class.forName("org.apache.hadoop.crypto.key.KeyProviderFactory");
|
||||
} catch (ClassNotFoundException e) {
|
||||
// should be hadoop 2.5-, give up
|
||||
TEST_TRANSPARENT_ENCRYPTION = false;
|
||||
return;
|
||||
}
|
||||
|
||||
URI keyProviderUri =
|
||||
new URI("jceks://file" + TEST_UTIL.getDataTestDir("test.jks").toUri().toString());
|
||||
conf.set("dfs.encryption.key.provider.uri", keyProviderUri.toString());
|
||||
Method getKeyProviderMethod =
|
||||
keyProviderFactoryClass.getMethod("get", URI.class, Configuration.class);
|
||||
Object keyProvider = getKeyProviderMethod.invoke(null, keyProviderUri, conf);
|
||||
Class<?> keyProviderClass = Class.forName("org.apache.hadoop.crypto.key.KeyProvider");
|
||||
Class<?> keyProviderOptionsClass =
|
||||
Class.forName("org.apache.hadoop.crypto.key.KeyProvider$Options");
|
||||
Method createKeyMethod =
|
||||
keyProviderClass.getMethod("createKey", String.class, keyProviderOptionsClass);
|
||||
Object options = keyProviderOptionsClass.getConstructor(Configuration.class).newInstance(conf);
|
||||
createKeyMethod.invoke(keyProvider, TEST_KEY_NAME, options);
|
||||
Method flushMethod = keyProviderClass.getMethod("flush");
|
||||
flushMethod.invoke(keyProvider);
|
||||
Method closeMethod = keyProviderClass.getMethod("close");
|
||||
closeMethod.invoke(keyProvider);
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
EVENT_LOOP_GROUP = new NioEventLoopGroup();
|
||||
|
@ -144,6 +187,8 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
|
|||
PRINCIPAL = USERNAME + "/" + HOST;
|
||||
HTTP_PRINCIPAL = "HTTP/" + HOST;
|
||||
KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL, HTTP_PRINCIPAL);
|
||||
|
||||
setUpKeyProvider(TEST_UTIL.getConfiguration());
|
||||
setHdfsSecuredConfiguration(TEST_UTIL.getConfiguration());
|
||||
HBaseKerberosUtils.setKeytabFileForTesting(KEYTAB_FILE.getAbsolutePath());
|
||||
HBaseKerberosUtils.setPrincipalForTesting(PRINCIPAL + "@" + KDC.getRealm());
|
||||
|
@ -161,6 +206,17 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
|
|||
}
|
||||
}
|
||||
|
||||
private Path testDirOnTestFs;
|
||||
|
||||
private void createEncryptionZone() throws Exception {
|
||||
if (!TEST_TRANSPARENT_ENCRYPTION) {
|
||||
return;
|
||||
}
|
||||
Method method =
|
||||
DistributedFileSystem.class.getMethod("createEncryptionZone", Path.class, String.class);
|
||||
method.invoke(FS, testDirOnTestFs, TEST_KEY_NAME);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
TEST_UTIL.getConfiguration().set("dfs.data.transfer.protection", protection);
|
||||
|
@ -182,6 +238,11 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
|
|||
|
||||
TEST_UTIL.startMiniDFSCluster(3);
|
||||
FS = TEST_UTIL.getDFSCluster().getFileSystem();
|
||||
testDirOnTestFs = new Path("/" + name.getMethodName().replaceAll("[^0-9a-zA-Z]", "_"));
|
||||
FS.mkdirs(testDirOnTestFs);
|
||||
if (useTransparentEncryption) {
|
||||
createEncryptionZone();
|
||||
}
|
||||
}
|
||||
|
||||
@After
|
||||
|
@ -190,7 +251,7 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
|
|||
}
|
||||
|
||||
private Path getTestFile() {
|
||||
return new Path("/" + name.getMethodName().replaceAll("[^0-9a-zA-Z]", "_"));
|
||||
return new Path(testDirOnTestFs, "test");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue