diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java index 7a429c2a4c2..ed9da5fd53b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java @@ -69,7 +69,6 @@ import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler.Sharable; import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; import org.apache.hbase.thirdparty.io.netty.channel.ChannelId; import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler; -import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufDecoder; import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent; import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java index 32cdfcff315..e372726b24e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java @@ -99,7 +99,6 @@ import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline; import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler; -import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufDecoder; import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent; import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java index 59215dea74d..090b9b4a63f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java @@ -20,9 +20,9 @@ package org.apache.hadoop.hbase.io.asyncfs; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY; import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE; -import com.google.protobuf.ByteString; import com.google.protobuf.CodedOutputStream; import java.io.IOException; +import java.lang.reflect.Constructor; import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @@ -93,7 +93,6 @@ import org.apache.hbase.thirdparty.io.netty.channel.ChannelPromise; import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler; import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder; import org.apache.hbase.thirdparty.io.netty.handler.codec.MessageToByteEncoder; -import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufDecoder; import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent; import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler; @@ -355,15 +354,93 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { return Collections.singletonList(new CipherOption(CipherSuite.AES_CTR_NOPADDING)); } + /** + * The asyncfs subsystem emulates a HDFS client by sending protobuf messages via netty. + * After Hadoop 3.3.0, the protobuf classes are relocated to org.apache.hadoop.thirdparty.protobuf.*. + * Use Reflection to check which ones to use. + */ + private static class BuilderPayloadSetter { + private static Method setPayloadMethod; + private static Constructor constructor; + + /** + * Create a ByteString from byte array without copying (wrap), and then set it as the payload + * for the builder. + * + * @param builder builder for HDFS DataTransferEncryptorMessage. + * @param payload byte array of payload. + * @throws IOException + */ + static void wrapAndSetPayload(DataTransferEncryptorMessageProto.Builder builder, byte[] payload) + throws IOException { + Object byteStringObject; + try { + // byteStringObject = new LiteralByteString(payload); + byteStringObject = constructor.newInstance(payload); + // builder.setPayload(byteStringObject); + setPayloadMethod.invoke(builder, constructor.getDeclaringClass().cast(byteStringObject)); + } catch (IllegalAccessException | InstantiationException e) { + throw new RuntimeException(e); + + } catch (InvocationTargetException e) { + Throwables.propagateIfPossible(e.getTargetException(), IOException.class); + throw new RuntimeException(e.getTargetException()); + } + } + + static { + Class builderClass = DataTransferEncryptorMessageProto.Builder.class; + + // Try the unrelocated ByteString + Class byteStringClass = com.google.protobuf.ByteString.class; + try { + // See if it can load the relocated ByteString, which comes from hadoop-thirdparty. + byteStringClass = Class.forName("org.apache.hadoop.thirdparty.protobuf.ByteString"); + LOG.debug("Found relocated ByteString class from hadoop-thirdparty." + + " Assuming this is Hadoop 3.3.0+."); + } catch (ClassNotFoundException e) { + LOG.debug("Did not find relocated ByteString class from hadoop-thirdparty." + + " Assuming this is below Hadoop 3.3.0", e); + } + + // LiteralByteString is a package private class in protobuf. Make it accessible. + Class literalByteStringClass; + try { + literalByteStringClass = Class.forName( + "org.apache.hadoop.thirdparty.protobuf.ByteString$LiteralByteString"); + LOG.debug("Shaded LiteralByteString from hadoop-thirdparty is found."); + } catch (ClassNotFoundException e) { + try { + literalByteStringClass = Class.forName("com.google.protobuf.LiteralByteString"); + LOG.debug("com.google.protobuf.LiteralByteString found."); + } catch (ClassNotFoundException ex) { + throw new RuntimeException(ex); + } + } + + try { + constructor = literalByteStringClass.getDeclaredConstructor(byte[].class); + constructor.setAccessible(true); + } catch (NoSuchMethodException e) { + throw new RuntimeException(e); + } + + try { + setPayloadMethod = builderClass.getMethod("setPayload", byteStringClass); + } catch (NoSuchMethodException e) { + // if either method is not found, we are in big trouble. Abort. + throw new RuntimeException(e); + } + } + } + private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload, List options) throws IOException { DataTransferEncryptorMessageProto.Builder builder = DataTransferEncryptorMessageProto.newBuilder(); builder.setStatus(DataTransferEncryptorStatus.SUCCESS); if (payload != null) { - // Was ByteStringer; fix w/o using ByteStringer. Its in hbase-protocol - // and we want to keep that out of hbase-server. - builder.setPayload(ByteString.copyFrom(payload)); + BuilderPayloadSetter.wrapAndSetPayload(builder, payload); } if (options != null) { builder.addAllCipherOption(PBHelperClient.convertCipherOptions(options)); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/ProtobufDecoder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/ProtobufDecoder.java new file mode 100644 index 00000000000..98b4e6f08e1 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/ProtobufDecoder.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.asyncfs; + +import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; +import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufUtil; +import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; +import org.apache.hbase.thirdparty.io.netty.handler.codec.MessageToMessageDecoder; +import org.apache.hbase.thirdparty.io.netty.util.internal.ObjectUtil; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.List; + +/** + * Modified based on io.netty.handler.codec.protobuf.ProtobufDecoder. + * The Netty's ProtobufDecode supports unshaded protobuf messages (com.google.protobuf). + * + * Hadoop 3.3.0 and above relocates protobuf classes to a shaded jar (hadoop-thirdparty), and + * so we must use reflection to detect which one (relocated or not) to use. + * + * Do not use this to process HBase's shaded protobuf messages. This is meant to process the + * protobuf messages in HDFS for the asyncfs use case. + * */ +@InterfaceAudience.Private +public class ProtobufDecoder extends MessageToMessageDecoder { + private static final Logger LOG = + LoggerFactory.getLogger(ProtobufDecoder.class); + + private static Class protobufMessageLiteClass = null; + private static Class protobufMessageLiteBuilderClass = null; + + private static final boolean HAS_PARSER; + + private static Method getParserForTypeMethod; + private static Method newBuilderForTypeMethod; + + private Method parseFromMethod; + private Method mergeFromMethod; + private Method buildMethod; + + private Object parser; + private Object builder; + + + public ProtobufDecoder(Object prototype) { + try { + Method getDefaultInstanceForTypeMethod = protobufMessageLiteClass.getMethod( + "getDefaultInstanceForType"); + Object prototype1 = getDefaultInstanceForTypeMethod + .invoke(ObjectUtil.checkNotNull(prototype, "prototype")); + + // parser = prototype.getParserForType() + parser = getParserForTypeMethod.invoke(prototype1); + parseFromMethod = parser.getClass().getMethod( + "parseFrom", byte[].class, int.class, int.class); + + // builder = prototype.newBuilderForType(); + builder = newBuilderForTypeMethod.invoke(prototype1); + mergeFromMethod = builder.getClass().getMethod( + "mergeFrom", byte[].class, int.class, int.class); + + // All protobuf message builders inherits from MessageLite.Builder + buildMethod = protobufMessageLiteBuilderClass.getDeclaredMethod("build"); + + } catch (IllegalAccessException | NoSuchMethodException e) { + throw new RuntimeException(e); + } catch (InvocationTargetException e) { + throw new RuntimeException(e.getTargetException()); + } + } + + protected void decode( + ChannelHandlerContext ctx, ByteBuf msg, List out) throws Exception { + int length = msg.readableBytes(); + byte[] array; + int offset; + if (msg.hasArray()) { + array = msg.array(); + offset = msg.arrayOffset() + msg.readerIndex(); + } else { + array = ByteBufUtil.getBytes(msg, msg.readerIndex(), length, false); + offset = 0; + } + + Object addObj; + if (HAS_PARSER) { + // addObj = parser.parseFrom(array, offset, length); + addObj = parseFromMethod.invoke(parser, array, offset, length); + } else { + // addObj = builder.mergeFrom(array, offset, length).build(); + Object builderObj = mergeFromMethod.invoke(builder, array, offset, length); + addObj = buildMethod.invoke(builderObj); + } + out.add(addObj); + } + + static { + boolean hasParser = false; + + // These are the protobuf classes coming from Hadoop. Not the one from hbase-shaded-protobuf + protobufMessageLiteClass = com.google.protobuf.MessageLite.class; + protobufMessageLiteBuilderClass = com.google.protobuf.MessageLite.Builder.class; + + try { + protobufMessageLiteClass = Class.forName("org.apache.hadoop.thirdparty.protobuf.MessageLite"); + protobufMessageLiteBuilderClass = Class.forName( + "org.apache.hadoop.thirdparty.protobuf.MessageLite.Builder"); + LOG.debug("Hadoop 3.3 and above shades protobuf."); + } catch (ClassNotFoundException e) { + LOG.debug("Hadoop 3.2 and below use unshaded protobuf.", e); + } + + try { + getParserForTypeMethod = protobufMessageLiteClass.getDeclaredMethod("getParserForType"); + newBuilderForTypeMethod = protobufMessageLiteClass.getDeclaredMethod("newBuilderForType"); + } catch (NoSuchMethodException e) { + // If the method is not found, we are in trouble. Abort. + throw new RuntimeException(e); + } + + try { + protobufMessageLiteClass.getDeclaredMethod("getParserForType"); + hasParser = true; + } catch (Throwable var2) { + } + + HAS_PARSER = hasParser; + } +}