diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java index 0e5cf81c9f2..bd60729e87f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java @@ -21,9 +21,9 @@ import static org.apache.hadoop.fs.CreateFlag.CREATE; import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createEncryptor; import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT; import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_CREATE; import static org.apache.hbase.thirdparty.io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS; import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE; @@ -45,6 +45,7 @@ import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystemLinkResolver; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hbase.client.ConnectionUtils; @@ -63,6 +64,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; import org.apache.hadoop.hdfs.protocol.datatransfer.Op; import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; +import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.ECN; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto; @@ -71,23 +73,19 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationH import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; -import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto; -import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto; +import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DataChecksum; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hbase.thirdparty.com.google.common.base.Throwables; -import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap; import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator; @@ -136,35 +134,6 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { private static final DatanodeInfo[] EMPTY_DN_ARRAY = new DatanodeInfo[0]; - // helper class for getting Status from PipelineAckProto. In hadoop 2.6 or before, there is a - // getStatus method, and for hadoop 2.7 or after, the status is retrieved from flag. The flag may - // get from proto directly, or combined by the reply field of the proto and a ECN object. See - // createPipelineAckStatusGetter for more details. - private interface PipelineAckStatusGetter { - Status get(PipelineAckProto ack); - } - - private static final PipelineAckStatusGetter PIPELINE_ACK_STATUS_GETTER; - - // StorageType enum is placed under o.a.h.hdfs in hadoop 2.6 and o.a.h.fs in hadoop 2.7. So here - // we need to use reflection to set it.See createStorageTypeSetter for more details. - private interface StorageTypeSetter { - OpWriteBlockProto.Builder set(OpWriteBlockProto.Builder builder, Enum storageType); - } - - private static final StorageTypeSetter STORAGE_TYPE_SETTER; - - // helper class for calling add block method on namenode. There is a addBlockFlags parameter for - // hadoop 2.8 or later. See createBlockAdder for more details. - private interface BlockAdder { - - LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName, - ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId, String[] favoredNodes) - throws IOException; - } - - private static final BlockAdder BLOCK_ADDER; - private interface LeaseManager { void begin(DFSClient client, long inodeId); @@ -183,23 +152,6 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { private static final DFSClientAdaptor DFS_CLIENT_ADAPTOR; - // helper class for convert protos. - private interface PBHelper { - - ExtendedBlockProto convert(ExtendedBlock b); - - TokenProto convert(Token tok); - } - - private static final PBHelper PB_HELPER; - - // helper class for creating data checksum. - private interface ChecksumCreater { - DataChecksum createChecksum(DFSClient client); - } - - private static final ChecksumCreater CHECKSUM_CREATER; - // helper class for creating files. private interface FileCreator { default HdfsFileStatus create(ClientProtocol instance, String src, FsPermission masked, @@ -269,234 +221,6 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { }; } - private static PipelineAckStatusGetter createPipelineAckStatusGetter27() - throws NoSuchMethodException { - Method getFlagListMethod = PipelineAckProto.class.getMethod("getFlagList"); - @SuppressWarnings("rawtypes") - Class ecnClass; - try { - ecnClass = Class.forName("org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck$ECN") - .asSubclass(Enum.class); - } catch (ClassNotFoundException e) { - String msg = "Couldn't properly initialize the PipelineAck.ECN class. Please " + - "update your WAL Provider to not make use of the 'asyncfs' provider. See " + - "HBASE-16110 for more information."; - LOG.error(msg, e); - throw new Error(msg, e); - } - @SuppressWarnings("unchecked") - Enum disabledECN = Enum.valueOf(ecnClass, "DISABLED"); - Method getReplyMethod = PipelineAckProto.class.getMethod("getReply", int.class); - Method combineHeaderMethod = - PipelineAck.class.getMethod("combineHeader", ecnClass, Status.class); - Method getStatusFromHeaderMethod = - PipelineAck.class.getMethod("getStatusFromHeader", int.class); - return new PipelineAckStatusGetter() { - - @Override - public Status get(PipelineAckProto ack) { - try { - @SuppressWarnings("unchecked") - List flagList = (List) getFlagListMethod.invoke(ack); - Integer headerFlag; - if (flagList.isEmpty()) { - Status reply = (Status) getReplyMethod.invoke(ack, 0); - headerFlag = (Integer) combineHeaderMethod.invoke(null, disabledECN, reply); - } else { - headerFlag = flagList.get(0); - } - return (Status) getStatusFromHeaderMethod.invoke(null, headerFlag); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } - } - }; - } - - private static PipelineAckStatusGetter createPipelineAckStatusGetter26() - throws NoSuchMethodException { - Method getStatusMethod = PipelineAckProto.class.getMethod("getStatus", int.class); - return new PipelineAckStatusGetter() { - - @Override - public Status get(PipelineAckProto ack) { - try { - return (Status) getStatusMethod.invoke(ack, 0); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } - } - }; - } - - private static PipelineAckStatusGetter createPipelineAckStatusGetter() - throws NoSuchMethodException { - try { - return createPipelineAckStatusGetter27(); - } catch (NoSuchMethodException e) { - LOG.debug("Can not get expected method " + e.getMessage() + - ", this usually because your Hadoop is pre 2.7.0, " + - "try the methods in Hadoop 2.6.x instead."); - } - return createPipelineAckStatusGetter26(); - } - - private static StorageTypeSetter createStorageTypeSetter() throws NoSuchMethodException { - Method setStorageTypeMethod = - OpWriteBlockProto.Builder.class.getMethod("setStorageType", StorageTypeProto.class); - ImmutableMap.Builder builder = ImmutableMap.builder(); - for (StorageTypeProto storageTypeProto : StorageTypeProto.values()) { - builder.put(storageTypeProto.name(), storageTypeProto); - } - ImmutableMap name2ProtoEnum = builder.build(); - return new StorageTypeSetter() { - - @Override - public OpWriteBlockProto.Builder set(OpWriteBlockProto.Builder builder, Enum storageType) { - Object protoEnum = name2ProtoEnum.get(storageType.name()); - try { - setStorageTypeMethod.invoke(builder, protoEnum); - } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { - throw new RuntimeException(e); - } - return builder; - } - }; - } - - private static BlockAdder createBlockAdder() throws NoSuchMethodException { - for (Method method : ClientProtocol.class.getMethods()) { - if (method.getName().equals("addBlock")) { - Method addBlockMethod = method; - Class[] paramTypes = addBlockMethod.getParameterTypes(); - if (paramTypes[paramTypes.length - 1] == String[].class) { - return new BlockAdder() { - - @Override - public LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName, - ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId, - String[] favoredNodes) throws IOException { - try { - return (LocatedBlock) addBlockMethod.invoke(namenode, src, clientName, previous, - excludeNodes, fileId, favoredNodes); - } catch (IllegalAccessException e) { - throw new RuntimeException(e); - } catch (InvocationTargetException e) { - Throwables.propagateIfPossible(e.getTargetException(), IOException.class); - throw new RuntimeException(e); - } - } - }; - } else { - return new BlockAdder() { - - @Override - public LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName, - ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId, - String[] favoredNodes) throws IOException { - try { - return (LocatedBlock) addBlockMethod.invoke(namenode, src, clientName, previous, - excludeNodes, fileId, favoredNodes, null); - } catch (IllegalAccessException e) { - throw new RuntimeException(e); - } catch (InvocationTargetException e) { - Throwables.propagateIfPossible(e.getTargetException(), IOException.class); - throw new RuntimeException(e); - } - } - }; - } - } - } - throw new NoSuchMethodException("Can not find addBlock method in ClientProtocol"); - } - - private static PBHelper createPBHelper() throws NoSuchMethodException { - Class helperClass; - String clazzName = "org.apache.hadoop.hdfs.protocolPB.PBHelperClient"; - try { - helperClass = Class.forName(clazzName); - } catch (ClassNotFoundException e) { - helperClass = org.apache.hadoop.hdfs.protocolPB.PBHelper.class; - LOG.debug("" + clazzName + " not found (Hadoop is pre-2.8.0?); using " + - helperClass.toString() + " instead."); - } - Method convertEBMethod = helperClass.getMethod("convert", ExtendedBlock.class); - Method convertTokenMethod = helperClass.getMethod("convert", Token.class); - return new PBHelper() { - - @Override - public ExtendedBlockProto convert(ExtendedBlock b) { - try { - return (ExtendedBlockProto) convertEBMethod.invoke(null, b); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } - } - - @Override - public TokenProto convert(Token tok) { - try { - return (TokenProto) convertTokenMethod.invoke(null, tok); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } - } - }; - } - - private static ChecksumCreater createChecksumCreater28(Method getConfMethod, Class confClass) - throws NoSuchMethodException { - for (Method method : confClass.getMethods()) { - if (method.getName().equals("createChecksum")) { - Method createChecksumMethod = method; - return new ChecksumCreater() { - - @Override - public DataChecksum createChecksum(DFSClient client) { - try { - return (DataChecksum) createChecksumMethod.invoke(getConfMethod.invoke(client), - (Object) null); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } - } - }; - } - } - throw new NoSuchMethodException("Can not find createChecksum method in DfsClientConf"); - } - - private static ChecksumCreater createChecksumCreater27(Method getConfMethod, Class confClass) - throws NoSuchMethodException { - Method createChecksumMethod = confClass.getDeclaredMethod("createChecksum"); - createChecksumMethod.setAccessible(true); - return new ChecksumCreater() { - - @Override - public DataChecksum createChecksum(DFSClient client) { - try { - return (DataChecksum) createChecksumMethod.invoke(getConfMethod.invoke(client)); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } - } - }; - } - - private static ChecksumCreater createChecksumCreater() - throws NoSuchMethodException, ClassNotFoundException { - Method getConfMethod = DFSClient.class.getMethod("getConf"); - try { - return createChecksumCreater28(getConfMethod, - Class.forName("org.apache.hadoop.hdfs.client.impl.DfsClientConf")); - } catch (ClassNotFoundException e) { - LOG.debug("No DfsClientConf class found, should be hadoop 2.7-", e); - } - return createChecksumCreater27(getConfMethod, - Class.forName("org.apache.hadoop.hdfs.DFSClient$Conf")); - } - private static FileCreator createFileCreator3() throws NoSuchMethodException { Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class, String.class, EnumSetWritable.class, boolean.class, short.class, long.class, @@ -547,13 +271,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { static { try { - PIPELINE_ACK_STATUS_GETTER = createPipelineAckStatusGetter(); - STORAGE_TYPE_SETTER = createStorageTypeSetter(); - BLOCK_ADDER = createBlockAdder(); LEASE_MANAGER = createLeaseManager(); DFS_CLIENT_ADAPTOR = createDFSClientAdaptor(); - PB_HELPER = createPBHelper(); - CHECKSUM_CREATER = createChecksumCreater(); FILE_CREATOR = createFileCreator(); } catch (Exception e) { String msg = "Couldn't properly initialize access to HDFS internals. Please " + @@ -573,11 +292,19 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { } static DataChecksum createChecksum(DFSClient client) { - return CHECKSUM_CREATER.createChecksum(client); + return client.getConf().createChecksum(null); } static Status getStatus(PipelineAckProto ack) { - return PIPELINE_ACK_STATUS_GETTER.get(ack); + List flagList = ack.getFlagList(); + Integer headerFlag; + if (flagList.isEmpty()) { + Status reply = ack.getReply(0); + headerFlag = PipelineAck.combineHeader(ECN.DISABLED, reply); + } else { + headerFlag = flagList.get(0); + } + return PipelineAck.getStatusFromHeader(headerFlag); } private static void processWriteBlockResponse(Channel channel, DatanodeInfo dnInfo, @@ -641,12 +368,13 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { }); } - private static void requestWriteBlock(Channel channel, Enum storageType, + private static void requestWriteBlock(Channel channel, StorageType storageType, OpWriteBlockProto.Builder writeBlockProtoBuilder) throws IOException { - OpWriteBlockProto proto = STORAGE_TYPE_SETTER.set(writeBlockProtoBuilder, storageType).build(); + OpWriteBlockProto proto = + writeBlockProtoBuilder.setStorageType(PBHelperClient.convertStorageType(storageType)).build(); int protoLen = proto.getSerializedSize(); ByteBuf buffer = - channel.alloc().buffer(3 + CodedOutputStream.computeRawVarint32Size(protoLen) + protoLen); + channel.alloc().buffer(3 + CodedOutputStream.computeRawVarint32Size(protoLen) + protoLen); buffer.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION); buffer.writeByte(Op.WRITE_BLOCK.code); proto.writeDelimitedTo(new ByteBufOutputStream(buffer)); @@ -654,7 +382,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { } private static void initialize(Configuration conf, Channel channel, DatanodeInfo dnInfo, - Enum storageType, OpWriteBlockProto.Builder writeBlockProtoBuilder, int timeoutMs, + StorageType storageType, OpWriteBlockProto.Builder writeBlockProtoBuilder, int timeoutMs, DFSClient client, Token accessToken, Promise promise) throws IOException { Promise saslPromise = channel.eventLoop().newPromise(); @@ -678,7 +406,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { String clientName, LocatedBlock locatedBlock, long maxBytesRcvd, long latestGS, BlockConstructionStage stage, DataChecksum summer, EventLoopGroup eventLoopGroup, Class channelClass) { - Enum[] storageTypes = locatedBlock.getStorageTypes(); + StorageType[] storageTypes = locatedBlock.getStorageTypes(); DatanodeInfo[] datanodeInfos = locatedBlock.getLocations(); boolean connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT); @@ -686,9 +414,9 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { ExtendedBlock blockCopy = new ExtendedBlock(locatedBlock.getBlock()); blockCopy.setNumBytes(locatedBlock.getBlockSize()); ClientOperationHeaderProto header = ClientOperationHeaderProto.newBuilder() - .setBaseHeader(BaseHeaderProto.newBuilder().setBlock(PB_HELPER.convert(blockCopy)) - .setToken(PB_HELPER.convert(locatedBlock.getBlockToken()))) - .setClientName(clientName).build(); + .setBaseHeader(BaseHeaderProto.newBuilder().setBlock(PBHelperClient.convert(blockCopy)) + .setToken(PBHelperClient.convert(locatedBlock.getBlockToken()))) + .setClientName(clientName).build(); ChecksumProto checksumProto = DataTransferProtoUtil.toProto(summer); OpWriteBlockProto.Builder writeBlockProtoBuilder = OpWriteBlockProto.newBuilder() .setHeader(header).setStage(OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name())) @@ -699,7 +427,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { List> futureList = new ArrayList<>(datanodeInfos.length); for (int i = 0; i < datanodeInfos.length; i++) { DatanodeInfo dnInfo = datanodeInfos[i]; - Enum storageType = storageTypes[i]; + StorageType storageType = storageTypes[i]; Promise promise = eventLoopGroup.next().newPromise(); futureList.add(promise); String dnAddr = dnInfo.getXferAddr(connectToDnViaHostname); @@ -771,8 +499,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { List> futureList = null; try { DataChecksum summer = createChecksum(client); - locatedBlock = BLOCK_ADDER.addBlock(namenode, src, client.getClientName(), null, - excludesNodes, stat.getFileId(), null); + locatedBlock = namenode.addBlock(src, client.getClientName(), null, excludesNodes, + stat.getFileId(), null, null); List datanodeList = new ArrayList<>(); futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L, PIPELINE_SETUP_CREATE, summer, eventLoopGroup, channelClass); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java index a56c3d75e91..eb7bd04ca4c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java @@ -17,7 +17,7 @@ */ package org.apache.hadoop.hbase.io.asyncfs; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY; import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE; import com.google.protobuf.ByteString; @@ -66,7 +66,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus; -import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CipherOptionProto; +import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.security.SaslPropertiesResolver; @@ -128,16 +128,6 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { private static final SaslAdaptor SASL_ADAPTOR; - // helper class for convert protos. - private interface PBHelper { - - List convertCipherOptions(List options); - - List convertCipherOptionProtos(List options); - } - - private static final PBHelper PB_HELPER; - private interface TransparentCryptoHelper { Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo, DFSClient client) @@ -188,42 +178,7 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { }; } - private static PBHelper createPBHelper() throws NoSuchMethodException { - Class helperClass; - try { - helperClass = Class.forName("org.apache.hadoop.hdfs.protocolPB.PBHelperClient"); - } catch (ClassNotFoundException e) { - LOG.debug("No PBHelperClient class found, should be hadoop 2.7-", e); - helperClass = org.apache.hadoop.hdfs.protocolPB.PBHelper.class; - } - Method convertCipherOptionsMethod = helperClass.getMethod("convertCipherOptions", List.class); - Method convertCipherOptionProtosMethod = - helperClass.getMethod("convertCipherOptionProtos", List.class); - return new PBHelper() { - - @SuppressWarnings("unchecked") - @Override - public List convertCipherOptions(List options) { - try { - return (List) convertCipherOptionsMethod.invoke(null, options); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } - } - - @SuppressWarnings("unchecked") - @Override - public List convertCipherOptionProtos(List options) { - try { - return (List) convertCipherOptionProtosMethod.invoke(null, options); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } - } - }; - } - - private static TransparentCryptoHelper createTransparentCryptoHelper27() + private static TransparentCryptoHelper createTransparentCryptoHelperWithoutHDFS12396() throws NoSuchMethodException { Method decryptEncryptedDataEncryptionKeyMethod = DFSClient.class .getDeclaredMethod("decryptEncryptedDataEncryptionKey", FileEncryptionInfo.class); @@ -252,7 +207,7 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { }; } - private static TransparentCryptoHelper createTransparentCryptoHelper28() + private static TransparentCryptoHelper createTransparentCryptoHelperWithHDFS12396() throws ClassNotFoundException, NoSuchMethodException { Class hdfsKMSUtilCls = Class.forName("org.apache.hadoop.hdfs.HdfsKMSUtil"); Method decryptEncryptedDataEncryptionKeyMethod = hdfsKMSUtilCls.getDeclaredMethod( @@ -285,18 +240,17 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { private static TransparentCryptoHelper createTransparentCryptoHelper() throws NoSuchMethodException, ClassNotFoundException { try { - return createTransparentCryptoHelper27(); + return createTransparentCryptoHelperWithoutHDFS12396(); } catch (NoSuchMethodException e) { - LOG.debug("No decryptEncryptedDataEncryptionKey method in DFSClient, should be hadoop 2.8+", - e); + LOG.debug("No decryptEncryptedDataEncryptionKey method in DFSClient," + + " should be hadoop version with HDFS-12396", e); } - return createTransparentCryptoHelper28(); + return createTransparentCryptoHelperWithHDFS12396(); } static { try { SASL_ADAPTOR = createSaslAdaptor(); - PB_HELPER = createPBHelper(); TRANSPARENT_CRYPTO_HELPER = createTransparentCryptoHelper(); } catch (Exception e) { String msg = "Couldn't properly initialize access to HDFS internals. Please " @@ -413,7 +367,7 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { builder.setPayload(ByteString.copyFrom(payload)); } if (options != null) { - builder.addAllCipherOption(PB_HELPER.convertCipherOptions(options)); + builder.addAllCipherOption(PBHelperClient.convertCipherOptions(options)); } DataTransferEncryptorMessageProto proto = builder.build(); int size = proto.getSerializedSize(); @@ -498,7 +452,7 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { private CipherOption getCipherOption(DataTransferEncryptorMessageProto proto, boolean isNegotiatedQopPrivacy, SaslClient saslClient) throws IOException { List cipherOptions = - PB_HELPER.convertCipherOptionProtos(proto.getCipherOptionList()); + PBHelperClient.convertCipherOptionProtos(proto.getCipherOptionList()); if (cipherOptions == null || cipherOptions.isEmpty()) { return null; }