HBASE-22400 Remove the adapter code in async fs implementation for hadoop-2.7.x

This commit is contained in:
Duo Zhang 2019-05-15 09:59:41 +08:00 committed by Apache9
parent cb32f4faf0
commit 930691a846
2 changed files with 38 additions and 356 deletions

View File

@ -21,9 +21,9 @@ import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createEncryptor;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_CREATE;
import static org.apache.hbase.thirdparty.io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
@ -45,6 +45,7 @@ import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemLinkResolver;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.client.ConnectionUtils;
@ -63,6 +64,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.ECN;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
@ -71,23 +73,19 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationH
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator;
@ -136,35 +134,6 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
private static final DatanodeInfo[] EMPTY_DN_ARRAY = new DatanodeInfo[0];
// helper class for getting Status from PipelineAckProto. In hadoop 2.6 or before, there is a
// getStatus method, and for hadoop 2.7 or after, the status is retrieved from flag. The flag may
// get from proto directly, or combined by the reply field of the proto and a ECN object. See
// createPipelineAckStatusGetter for more details.
private interface PipelineAckStatusGetter {
Status get(PipelineAckProto ack);
}
private static final PipelineAckStatusGetter PIPELINE_ACK_STATUS_GETTER;
// StorageType enum is placed under o.a.h.hdfs in hadoop 2.6 and o.a.h.fs in hadoop 2.7. So here
// we need to use reflection to set it.See createStorageTypeSetter for more details.
private interface StorageTypeSetter {
OpWriteBlockProto.Builder set(OpWriteBlockProto.Builder builder, Enum<?> storageType);
}
private static final StorageTypeSetter STORAGE_TYPE_SETTER;
// helper class for calling add block method on namenode. There is a addBlockFlags parameter for
// hadoop 2.8 or later. See createBlockAdder for more details.
private interface BlockAdder {
LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName,
ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId, String[] favoredNodes)
throws IOException;
}
private static final BlockAdder BLOCK_ADDER;
private interface LeaseManager {
void begin(DFSClient client, long inodeId);
@ -183,23 +152,6 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
private static final DFSClientAdaptor DFS_CLIENT_ADAPTOR;
// helper class for convert protos.
private interface PBHelper {
ExtendedBlockProto convert(ExtendedBlock b);
TokenProto convert(Token<?> tok);
}
private static final PBHelper PB_HELPER;
// helper class for creating data checksum.
private interface ChecksumCreater {
DataChecksum createChecksum(DFSClient client);
}
private static final ChecksumCreater CHECKSUM_CREATER;
// helper class for creating files.
private interface FileCreator {
default HdfsFileStatus create(ClientProtocol instance, String src, FsPermission masked,
@ -269,234 +221,6 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
};
}
private static PipelineAckStatusGetter createPipelineAckStatusGetter27()
throws NoSuchMethodException {
Method getFlagListMethod = PipelineAckProto.class.getMethod("getFlagList");
@SuppressWarnings("rawtypes")
Class<? extends Enum> ecnClass;
try {
ecnClass = Class.forName("org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck$ECN")
.asSubclass(Enum.class);
} catch (ClassNotFoundException e) {
String msg = "Couldn't properly initialize the PipelineAck.ECN class. Please " +
"update your WAL Provider to not make use of the 'asyncfs' provider. See " +
"HBASE-16110 for more information.";
LOG.error(msg, e);
throw new Error(msg, e);
}
@SuppressWarnings("unchecked")
Enum<?> disabledECN = Enum.valueOf(ecnClass, "DISABLED");
Method getReplyMethod = PipelineAckProto.class.getMethod("getReply", int.class);
Method combineHeaderMethod =
PipelineAck.class.getMethod("combineHeader", ecnClass, Status.class);
Method getStatusFromHeaderMethod =
PipelineAck.class.getMethod("getStatusFromHeader", int.class);
return new PipelineAckStatusGetter() {
@Override
public Status get(PipelineAckProto ack) {
try {
@SuppressWarnings("unchecked")
List<Integer> flagList = (List<Integer>) getFlagListMethod.invoke(ack);
Integer headerFlag;
if (flagList.isEmpty()) {
Status reply = (Status) getReplyMethod.invoke(ack, 0);
headerFlag = (Integer) combineHeaderMethod.invoke(null, disabledECN, reply);
} else {
headerFlag = flagList.get(0);
}
return (Status) getStatusFromHeaderMethod.invoke(null, headerFlag);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}
};
}
private static PipelineAckStatusGetter createPipelineAckStatusGetter26()
throws NoSuchMethodException {
Method getStatusMethod = PipelineAckProto.class.getMethod("getStatus", int.class);
return new PipelineAckStatusGetter() {
@Override
public Status get(PipelineAckProto ack) {
try {
return (Status) getStatusMethod.invoke(ack, 0);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}
};
}
private static PipelineAckStatusGetter createPipelineAckStatusGetter()
throws NoSuchMethodException {
try {
return createPipelineAckStatusGetter27();
} catch (NoSuchMethodException e) {
LOG.debug("Can not get expected method " + e.getMessage() +
", this usually because your Hadoop is pre 2.7.0, " +
"try the methods in Hadoop 2.6.x instead.");
}
return createPipelineAckStatusGetter26();
}
private static StorageTypeSetter createStorageTypeSetter() throws NoSuchMethodException {
Method setStorageTypeMethod =
OpWriteBlockProto.Builder.class.getMethod("setStorageType", StorageTypeProto.class);
ImmutableMap.Builder<String, StorageTypeProto> builder = ImmutableMap.builder();
for (StorageTypeProto storageTypeProto : StorageTypeProto.values()) {
builder.put(storageTypeProto.name(), storageTypeProto);
}
ImmutableMap<String, StorageTypeProto> name2ProtoEnum = builder.build();
return new StorageTypeSetter() {
@Override
public OpWriteBlockProto.Builder set(OpWriteBlockProto.Builder builder, Enum<?> storageType) {
Object protoEnum = name2ProtoEnum.get(storageType.name());
try {
setStorageTypeMethod.invoke(builder, protoEnum);
} catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
throw new RuntimeException(e);
}
return builder;
}
};
}
private static BlockAdder createBlockAdder() throws NoSuchMethodException {
for (Method method : ClientProtocol.class.getMethods()) {
if (method.getName().equals("addBlock")) {
Method addBlockMethod = method;
Class<?>[] paramTypes = addBlockMethod.getParameterTypes();
if (paramTypes[paramTypes.length - 1] == String[].class) {
return new BlockAdder() {
@Override
public LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName,
ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId,
String[] favoredNodes) throws IOException {
try {
return (LocatedBlock) addBlockMethod.invoke(namenode, src, clientName, previous,
excludeNodes, fileId, favoredNodes);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
} catch (InvocationTargetException e) {
Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
throw new RuntimeException(e);
}
}
};
} else {
return new BlockAdder() {
@Override
public LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName,
ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId,
String[] favoredNodes) throws IOException {
try {
return (LocatedBlock) addBlockMethod.invoke(namenode, src, clientName, previous,
excludeNodes, fileId, favoredNodes, null);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
} catch (InvocationTargetException e) {
Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
throw new RuntimeException(e);
}
}
};
}
}
}
throw new NoSuchMethodException("Can not find addBlock method in ClientProtocol");
}
private static PBHelper createPBHelper() throws NoSuchMethodException {
Class<?> helperClass;
String clazzName = "org.apache.hadoop.hdfs.protocolPB.PBHelperClient";
try {
helperClass = Class.forName(clazzName);
} catch (ClassNotFoundException e) {
helperClass = org.apache.hadoop.hdfs.protocolPB.PBHelper.class;
LOG.debug("" + clazzName + " not found (Hadoop is pre-2.8.0?); using " +
helperClass.toString() + " instead.");
}
Method convertEBMethod = helperClass.getMethod("convert", ExtendedBlock.class);
Method convertTokenMethod = helperClass.getMethod("convert", Token.class);
return new PBHelper() {
@Override
public ExtendedBlockProto convert(ExtendedBlock b) {
try {
return (ExtendedBlockProto) convertEBMethod.invoke(null, b);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}
@Override
public TokenProto convert(Token<?> tok) {
try {
return (TokenProto) convertTokenMethod.invoke(null, tok);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}
};
}
private static ChecksumCreater createChecksumCreater28(Method getConfMethod, Class<?> confClass)
throws NoSuchMethodException {
for (Method method : confClass.getMethods()) {
if (method.getName().equals("createChecksum")) {
Method createChecksumMethod = method;
return new ChecksumCreater() {
@Override
public DataChecksum createChecksum(DFSClient client) {
try {
return (DataChecksum) createChecksumMethod.invoke(getConfMethod.invoke(client),
(Object) null);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}
};
}
}
throw new NoSuchMethodException("Can not find createChecksum method in DfsClientConf");
}
private static ChecksumCreater createChecksumCreater27(Method getConfMethod, Class<?> confClass)
throws NoSuchMethodException {
Method createChecksumMethod = confClass.getDeclaredMethod("createChecksum");
createChecksumMethod.setAccessible(true);
return new ChecksumCreater() {
@Override
public DataChecksum createChecksum(DFSClient client) {
try {
return (DataChecksum) createChecksumMethod.invoke(getConfMethod.invoke(client));
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}
};
}
private static ChecksumCreater createChecksumCreater()
throws NoSuchMethodException, ClassNotFoundException {
Method getConfMethod = DFSClient.class.getMethod("getConf");
try {
return createChecksumCreater28(getConfMethod,
Class.forName("org.apache.hadoop.hdfs.client.impl.DfsClientConf"));
} catch (ClassNotFoundException e) {
LOG.debug("No DfsClientConf class found, should be hadoop 2.7-", e);
}
return createChecksumCreater27(getConfMethod,
Class.forName("org.apache.hadoop.hdfs.DFSClient$Conf"));
}
private static FileCreator createFileCreator3() throws NoSuchMethodException {
Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class,
String.class, EnumSetWritable.class, boolean.class, short.class, long.class,
@ -547,13 +271,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
static {
try {
PIPELINE_ACK_STATUS_GETTER = createPipelineAckStatusGetter();
STORAGE_TYPE_SETTER = createStorageTypeSetter();
BLOCK_ADDER = createBlockAdder();
LEASE_MANAGER = createLeaseManager();
DFS_CLIENT_ADAPTOR = createDFSClientAdaptor();
PB_HELPER = createPBHelper();
CHECKSUM_CREATER = createChecksumCreater();
FILE_CREATOR = createFileCreator();
} catch (Exception e) {
String msg = "Couldn't properly initialize access to HDFS internals. Please " +
@ -573,11 +292,19 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
}
static DataChecksum createChecksum(DFSClient client) {
return CHECKSUM_CREATER.createChecksum(client);
return client.getConf().createChecksum(null);
}
static Status getStatus(PipelineAckProto ack) {
return PIPELINE_ACK_STATUS_GETTER.get(ack);
List<Integer> flagList = ack.getFlagList();
Integer headerFlag;
if (flagList.isEmpty()) {
Status reply = ack.getReply(0);
headerFlag = PipelineAck.combineHeader(ECN.DISABLED, reply);
} else {
headerFlag = flagList.get(0);
}
return PipelineAck.getStatusFromHeader(headerFlag);
}
private static void processWriteBlockResponse(Channel channel, DatanodeInfo dnInfo,
@ -641,9 +368,10 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
});
}
private static void requestWriteBlock(Channel channel, Enum<?> storageType,
private static void requestWriteBlock(Channel channel, StorageType storageType,
OpWriteBlockProto.Builder writeBlockProtoBuilder) throws IOException {
OpWriteBlockProto proto = STORAGE_TYPE_SETTER.set(writeBlockProtoBuilder, storageType).build();
OpWriteBlockProto proto =
writeBlockProtoBuilder.setStorageType(PBHelperClient.convertStorageType(storageType)).build();
int protoLen = proto.getSerializedSize();
ByteBuf buffer =
channel.alloc().buffer(3 + CodedOutputStream.computeRawVarint32Size(protoLen) + protoLen);
@ -654,7 +382,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
}
private static void initialize(Configuration conf, Channel channel, DatanodeInfo dnInfo,
Enum<?> storageType, OpWriteBlockProto.Builder writeBlockProtoBuilder, int timeoutMs,
StorageType storageType, OpWriteBlockProto.Builder writeBlockProtoBuilder, int timeoutMs,
DFSClient client, Token<BlockTokenIdentifier> accessToken, Promise<Channel> promise)
throws IOException {
Promise<Void> saslPromise = channel.eventLoop().newPromise();
@ -678,7 +406,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
String clientName, LocatedBlock locatedBlock, long maxBytesRcvd, long latestGS,
BlockConstructionStage stage, DataChecksum summer, EventLoopGroup eventLoopGroup,
Class<? extends Channel> channelClass) {
Enum<?>[] storageTypes = locatedBlock.getStorageTypes();
StorageType[] storageTypes = locatedBlock.getStorageTypes();
DatanodeInfo[] datanodeInfos = locatedBlock.getLocations();
boolean connectToDnViaHostname =
conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
@ -686,8 +414,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
ExtendedBlock blockCopy = new ExtendedBlock(locatedBlock.getBlock());
blockCopy.setNumBytes(locatedBlock.getBlockSize());
ClientOperationHeaderProto header = ClientOperationHeaderProto.newBuilder()
.setBaseHeader(BaseHeaderProto.newBuilder().setBlock(PB_HELPER.convert(blockCopy))
.setToken(PB_HELPER.convert(locatedBlock.getBlockToken())))
.setBaseHeader(BaseHeaderProto.newBuilder().setBlock(PBHelperClient.convert(blockCopy))
.setToken(PBHelperClient.convert(locatedBlock.getBlockToken())))
.setClientName(clientName).build();
ChecksumProto checksumProto = DataTransferProtoUtil.toProto(summer);
OpWriteBlockProto.Builder writeBlockProtoBuilder = OpWriteBlockProto.newBuilder()
@ -699,7 +427,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
List<Future<Channel>> futureList = new ArrayList<>(datanodeInfos.length);
for (int i = 0; i < datanodeInfos.length; i++) {
DatanodeInfo dnInfo = datanodeInfos[i];
Enum<?> storageType = storageTypes[i];
StorageType storageType = storageTypes[i];
Promise<Channel> promise = eventLoopGroup.next().newPromise();
futureList.add(promise);
String dnAddr = dnInfo.getXferAddr(connectToDnViaHostname);
@ -771,8 +499,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
List<Future<Channel>> futureList = null;
try {
DataChecksum summer = createChecksum(client);
locatedBlock = BLOCK_ADDER.addBlock(namenode, src, client.getClientName(), null,
excludesNodes, stat.getFileId(), null);
locatedBlock = namenode.addBlock(src, client.getClientName(), null, excludesNodes,
stat.getFileId(), null, null);
List<Channel> datanodeList = new ArrayList<>();
futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L,
PIPELINE_SETUP_CREATE, summer, eventLoopGroup, channelClass);

View File

@ -17,7 +17,7 @@
*/
package org.apache.hadoop.hbase.io.asyncfs;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
import com.google.protobuf.ByteString;
@ -66,7 +66,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CipherOptionProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.security.SaslPropertiesResolver;
@ -128,16 +128,6 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
private static final SaslAdaptor SASL_ADAPTOR;
// helper class for convert protos.
private interface PBHelper {
List<CipherOptionProto> convertCipherOptions(List<CipherOption> options);
List<CipherOption> convertCipherOptionProtos(List<CipherOptionProto> options);
}
private static final PBHelper PB_HELPER;
private interface TransparentCryptoHelper {
Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo, DFSClient client)
@ -188,42 +178,7 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
};
}
private static PBHelper createPBHelper() throws NoSuchMethodException {
Class<?> helperClass;
try {
helperClass = Class.forName("org.apache.hadoop.hdfs.protocolPB.PBHelperClient");
} catch (ClassNotFoundException e) {
LOG.debug("No PBHelperClient class found, should be hadoop 2.7-", e);
helperClass = org.apache.hadoop.hdfs.protocolPB.PBHelper.class;
}
Method convertCipherOptionsMethod = helperClass.getMethod("convertCipherOptions", List.class);
Method convertCipherOptionProtosMethod =
helperClass.getMethod("convertCipherOptionProtos", List.class);
return new PBHelper() {
@SuppressWarnings("unchecked")
@Override
public List<CipherOptionProto> convertCipherOptions(List<CipherOption> options) {
try {
return (List<CipherOptionProto>) convertCipherOptionsMethod.invoke(null, options);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}
@SuppressWarnings("unchecked")
@Override
public List<CipherOption> convertCipherOptionProtos(List<CipherOptionProto> options) {
try {
return (List<CipherOption>) convertCipherOptionProtosMethod.invoke(null, options);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}
};
}
private static TransparentCryptoHelper createTransparentCryptoHelper27()
private static TransparentCryptoHelper createTransparentCryptoHelperWithoutHDFS12396()
throws NoSuchMethodException {
Method decryptEncryptedDataEncryptionKeyMethod = DFSClient.class
.getDeclaredMethod("decryptEncryptedDataEncryptionKey", FileEncryptionInfo.class);
@ -252,7 +207,7 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
};
}
private static TransparentCryptoHelper createTransparentCryptoHelper28()
private static TransparentCryptoHelper createTransparentCryptoHelperWithHDFS12396()
throws ClassNotFoundException, NoSuchMethodException {
Class<?> hdfsKMSUtilCls = Class.forName("org.apache.hadoop.hdfs.HdfsKMSUtil");
Method decryptEncryptedDataEncryptionKeyMethod = hdfsKMSUtilCls.getDeclaredMethod(
@ -285,18 +240,17 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
private static TransparentCryptoHelper createTransparentCryptoHelper()
throws NoSuchMethodException, ClassNotFoundException {
try {
return createTransparentCryptoHelper27();
return createTransparentCryptoHelperWithoutHDFS12396();
} catch (NoSuchMethodException e) {
LOG.debug("No decryptEncryptedDataEncryptionKey method in DFSClient, should be hadoop 2.8+",
e);
LOG.debug("No decryptEncryptedDataEncryptionKey method in DFSClient," +
" should be hadoop version with HDFS-12396", e);
}
return createTransparentCryptoHelper28();
return createTransparentCryptoHelperWithHDFS12396();
}
static {
try {
SASL_ADAPTOR = createSaslAdaptor();
PB_HELPER = createPBHelper();
TRANSPARENT_CRYPTO_HELPER = createTransparentCryptoHelper();
} catch (Exception e) {
String msg = "Couldn't properly initialize access to HDFS internals. Please "
@ -413,7 +367,7 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
builder.setPayload(ByteString.copyFrom(payload));
}
if (options != null) {
builder.addAllCipherOption(PB_HELPER.convertCipherOptions(options));
builder.addAllCipherOption(PBHelperClient.convertCipherOptions(options));
}
DataTransferEncryptorMessageProto proto = builder.build();
int size = proto.getSerializedSize();
@ -498,7 +452,7 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
private CipherOption getCipherOption(DataTransferEncryptorMessageProto proto,
boolean isNegotiatedQopPrivacy, SaslClient saslClient) throws IOException {
List<CipherOption> cipherOptions =
PB_HELPER.convertCipherOptionProtos(proto.getCipherOptionList());
PBHelperClient.convertCipherOptionProtos(proto.getCipherOptionList());
if (cipherOptions == null || cipherOptions.isEmpty()) {
return null;
}