* Use Reflection to access shaded Hadoop protobuf classes. (cherry picked from commit a321e536989083ca3620bf2c53f12c07740bf5b0) * Update to improve the code: 1. Added license. 2. Added more comments. 3. Wrap byte array instead of copy to make a ByteString. 4. Moved all reflection instantiation to static class loading time. * Use LiteralByteString to wrap byte array instead of copying it. Signed-off-by: Duo Zhang <zhangduo@apache.org> Signed-off-by: stack <stack@apache.org> (cherry picked from commit72727ff9be
) (cherry picked from commitae3de38bed
)
This commit is contained in:
parent
fe423c47d7
commit
b8b8e0afd4
|
@ -69,7 +69,6 @@ import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler.Sharable;
|
||||||
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
|
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
|
||||||
import org.apache.hbase.thirdparty.io.netty.channel.ChannelId;
|
import org.apache.hbase.thirdparty.io.netty.channel.ChannelId;
|
||||||
import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
|
import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
|
||||||
import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufDecoder;
|
|
||||||
import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
|
import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
|
||||||
import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
|
import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
|
||||||
import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
|
import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
|
||||||
|
|
|
@ -99,7 +99,6 @@ import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
|
||||||
import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
|
import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
|
||||||
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
|
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
|
||||||
import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
|
import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
|
||||||
import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufDecoder;
|
|
||||||
import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
|
import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
|
||||||
import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
|
import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
|
||||||
import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
|
import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
|
||||||
|
|
|
@ -20,9 +20,9 @@ package org.apache.hadoop.hbase.io.asyncfs;
|
||||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
|
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
|
||||||
import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
|
import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
|
||||||
|
|
||||||
import com.google.protobuf.ByteString;
|
|
||||||
import com.google.protobuf.CodedOutputStream;
|
import com.google.protobuf.CodedOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.lang.reflect.Constructor;
|
||||||
import java.lang.reflect.Field;
|
import java.lang.reflect.Field;
|
||||||
import java.lang.reflect.InvocationTargetException;
|
import java.lang.reflect.InvocationTargetException;
|
||||||
import java.lang.reflect.Method;
|
import java.lang.reflect.Method;
|
||||||
|
@ -93,7 +93,6 @@ import org.apache.hbase.thirdparty.io.netty.channel.ChannelPromise;
|
||||||
import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
|
import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
|
||||||
import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
|
import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
|
||||||
import org.apache.hbase.thirdparty.io.netty.handler.codec.MessageToByteEncoder;
|
import org.apache.hbase.thirdparty.io.netty.handler.codec.MessageToByteEncoder;
|
||||||
import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufDecoder;
|
|
||||||
import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
|
import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
|
||||||
import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
|
import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
|
||||||
import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
|
import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
|
||||||
|
@ -355,15 +354,93 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
|
||||||
return Collections.singletonList(new CipherOption(CipherSuite.AES_CTR_NOPADDING));
|
return Collections.singletonList(new CipherOption(CipherSuite.AES_CTR_NOPADDING));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The asyncfs subsystem emulates a HDFS client by sending protobuf messages via netty.
|
||||||
|
* After Hadoop 3.3.0, the protobuf classes are relocated to org.apache.hadoop.thirdparty.protobuf.*.
|
||||||
|
* Use Reflection to check which ones to use.
|
||||||
|
*/
|
||||||
|
private static class BuilderPayloadSetter {
|
||||||
|
private static Method setPayloadMethod;
|
||||||
|
private static Constructor<?> constructor;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a ByteString from byte array without copying (wrap), and then set it as the payload
|
||||||
|
* for the builder.
|
||||||
|
*
|
||||||
|
* @param builder builder for HDFS DataTransferEncryptorMessage.
|
||||||
|
* @param payload byte array of payload.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
static void wrapAndSetPayload(DataTransferEncryptorMessageProto.Builder builder, byte[] payload)
|
||||||
|
throws IOException {
|
||||||
|
Object byteStringObject;
|
||||||
|
try {
|
||||||
|
// byteStringObject = new LiteralByteString(payload);
|
||||||
|
byteStringObject = constructor.newInstance(payload);
|
||||||
|
// builder.setPayload(byteStringObject);
|
||||||
|
setPayloadMethod.invoke(builder, constructor.getDeclaringClass().cast(byteStringObject));
|
||||||
|
} catch (IllegalAccessException | InstantiationException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
|
||||||
|
} catch (InvocationTargetException e) {
|
||||||
|
Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
|
||||||
|
throw new RuntimeException(e.getTargetException());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static {
|
||||||
|
Class<?> builderClass = DataTransferEncryptorMessageProto.Builder.class;
|
||||||
|
|
||||||
|
// Try the unrelocated ByteString
|
||||||
|
Class<?> byteStringClass = com.google.protobuf.ByteString.class;
|
||||||
|
try {
|
||||||
|
// See if it can load the relocated ByteString, which comes from hadoop-thirdparty.
|
||||||
|
byteStringClass = Class.forName("org.apache.hadoop.thirdparty.protobuf.ByteString");
|
||||||
|
LOG.debug("Found relocated ByteString class from hadoop-thirdparty." +
|
||||||
|
" Assuming this is Hadoop 3.3.0+.");
|
||||||
|
} catch (ClassNotFoundException e) {
|
||||||
|
LOG.debug("Did not find relocated ByteString class from hadoop-thirdparty." +
|
||||||
|
" Assuming this is below Hadoop 3.3.0", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
// LiteralByteString is a package private class in protobuf. Make it accessible.
|
||||||
|
Class<?> literalByteStringClass;
|
||||||
|
try {
|
||||||
|
literalByteStringClass = Class.forName(
|
||||||
|
"org.apache.hadoop.thirdparty.protobuf.ByteString$LiteralByteString");
|
||||||
|
LOG.debug("Shaded LiteralByteString from hadoop-thirdparty is found.");
|
||||||
|
} catch (ClassNotFoundException e) {
|
||||||
|
try {
|
||||||
|
literalByteStringClass = Class.forName("com.google.protobuf.LiteralByteString");
|
||||||
|
LOG.debug("com.google.protobuf.LiteralByteString found.");
|
||||||
|
} catch (ClassNotFoundException ex) {
|
||||||
|
throw new RuntimeException(ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
constructor = literalByteStringClass.getDeclaredConstructor(byte[].class);
|
||||||
|
constructor.setAccessible(true);
|
||||||
|
} catch (NoSuchMethodException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
setPayloadMethod = builderClass.getMethod("setPayload", byteStringClass);
|
||||||
|
} catch (NoSuchMethodException e) {
|
||||||
|
// if either method is not found, we are in big trouble. Abort.
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload,
|
private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload,
|
||||||
List<CipherOption> options) throws IOException {
|
List<CipherOption> options) throws IOException {
|
||||||
DataTransferEncryptorMessageProto.Builder builder =
|
DataTransferEncryptorMessageProto.Builder builder =
|
||||||
DataTransferEncryptorMessageProto.newBuilder();
|
DataTransferEncryptorMessageProto.newBuilder();
|
||||||
builder.setStatus(DataTransferEncryptorStatus.SUCCESS);
|
builder.setStatus(DataTransferEncryptorStatus.SUCCESS);
|
||||||
if (payload != null) {
|
if (payload != null) {
|
||||||
// Was ByteStringer; fix w/o using ByteStringer. Its in hbase-protocol
|
BuilderPayloadSetter.wrapAndSetPayload(builder, payload);
|
||||||
// and we want to keep that out of hbase-server.
|
|
||||||
builder.setPayload(ByteString.copyFrom(payload));
|
|
||||||
}
|
}
|
||||||
if (options != null) {
|
if (options != null) {
|
||||||
builder.addAllCipherOption(PBHelperClient.convertCipherOptions(options));
|
builder.addAllCipherOption(PBHelperClient.convertCipherOptions(options));
|
||||||
|
|
|
@ -0,0 +1,148 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.io.asyncfs;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
|
||||||
|
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufUtil;
|
||||||
|
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
|
||||||
|
import org.apache.hbase.thirdparty.io.netty.handler.codec.MessageToMessageDecoder;
|
||||||
|
import org.apache.hbase.thirdparty.io.netty.util.internal.ObjectUtil;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.lang.reflect.InvocationTargetException;
|
||||||
|
import java.lang.reflect.Method;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Modified based on io.netty.handler.codec.protobuf.ProtobufDecoder.
|
||||||
|
* The Netty's ProtobufDecode supports unshaded protobuf messages (com.google.protobuf).
|
||||||
|
*
|
||||||
|
* Hadoop 3.3.0 and above relocates protobuf classes to a shaded jar (hadoop-thirdparty), and
|
||||||
|
* so we must use reflection to detect which one (relocated or not) to use.
|
||||||
|
*
|
||||||
|
* Do not use this to process HBase's shaded protobuf messages. This is meant to process the
|
||||||
|
* protobuf messages in HDFS for the asyncfs use case.
|
||||||
|
* */
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class ProtobufDecoder extends MessageToMessageDecoder<ByteBuf> {
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(ProtobufDecoder.class);
|
||||||
|
|
||||||
|
private static Class<?> protobufMessageLiteClass = null;
|
||||||
|
private static Class<?> protobufMessageLiteBuilderClass = null;
|
||||||
|
|
||||||
|
private static final boolean HAS_PARSER;
|
||||||
|
|
||||||
|
private static Method getParserForTypeMethod;
|
||||||
|
private static Method newBuilderForTypeMethod;
|
||||||
|
|
||||||
|
private Method parseFromMethod;
|
||||||
|
private Method mergeFromMethod;
|
||||||
|
private Method buildMethod;
|
||||||
|
|
||||||
|
private Object parser;
|
||||||
|
private Object builder;
|
||||||
|
|
||||||
|
|
||||||
|
public ProtobufDecoder(Object prototype) {
|
||||||
|
try {
|
||||||
|
Method getDefaultInstanceForTypeMethod = protobufMessageLiteClass.getMethod(
|
||||||
|
"getDefaultInstanceForType");
|
||||||
|
Object prototype1 = getDefaultInstanceForTypeMethod
|
||||||
|
.invoke(ObjectUtil.checkNotNull(prototype, "prototype"));
|
||||||
|
|
||||||
|
// parser = prototype.getParserForType()
|
||||||
|
parser = getParserForTypeMethod.invoke(prototype1);
|
||||||
|
parseFromMethod = parser.getClass().getMethod(
|
||||||
|
"parseFrom", byte[].class, int.class, int.class);
|
||||||
|
|
||||||
|
// builder = prototype.newBuilderForType();
|
||||||
|
builder = newBuilderForTypeMethod.invoke(prototype1);
|
||||||
|
mergeFromMethod = builder.getClass().getMethod(
|
||||||
|
"mergeFrom", byte[].class, int.class, int.class);
|
||||||
|
|
||||||
|
// All protobuf message builders inherits from MessageLite.Builder
|
||||||
|
buildMethod = protobufMessageLiteBuilderClass.getDeclaredMethod("build");
|
||||||
|
|
||||||
|
} catch (IllegalAccessException | NoSuchMethodException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
} catch (InvocationTargetException e) {
|
||||||
|
throw new RuntimeException(e.getTargetException());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void decode(
|
||||||
|
ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
|
||||||
|
int length = msg.readableBytes();
|
||||||
|
byte[] array;
|
||||||
|
int offset;
|
||||||
|
if (msg.hasArray()) {
|
||||||
|
array = msg.array();
|
||||||
|
offset = msg.arrayOffset() + msg.readerIndex();
|
||||||
|
} else {
|
||||||
|
array = ByteBufUtil.getBytes(msg, msg.readerIndex(), length, false);
|
||||||
|
offset = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
Object addObj;
|
||||||
|
if (HAS_PARSER) {
|
||||||
|
// addObj = parser.parseFrom(array, offset, length);
|
||||||
|
addObj = parseFromMethod.invoke(parser, array, offset, length);
|
||||||
|
} else {
|
||||||
|
// addObj = builder.mergeFrom(array, offset, length).build();
|
||||||
|
Object builderObj = mergeFromMethod.invoke(builder, array, offset, length);
|
||||||
|
addObj = buildMethod.invoke(builderObj);
|
||||||
|
}
|
||||||
|
out.add(addObj);
|
||||||
|
}
|
||||||
|
|
||||||
|
static {
|
||||||
|
boolean hasParser = false;
|
||||||
|
|
||||||
|
// These are the protobuf classes coming from Hadoop. Not the one from hbase-shaded-protobuf
|
||||||
|
protobufMessageLiteClass = com.google.protobuf.MessageLite.class;
|
||||||
|
protobufMessageLiteBuilderClass = com.google.protobuf.MessageLite.Builder.class;
|
||||||
|
|
||||||
|
try {
|
||||||
|
protobufMessageLiteClass = Class.forName("org.apache.hadoop.thirdparty.protobuf.MessageLite");
|
||||||
|
protobufMessageLiteBuilderClass = Class.forName(
|
||||||
|
"org.apache.hadoop.thirdparty.protobuf.MessageLite.Builder");
|
||||||
|
LOG.debug("Hadoop 3.3 and above shades protobuf.");
|
||||||
|
} catch (ClassNotFoundException e) {
|
||||||
|
LOG.debug("Hadoop 3.2 and below use unshaded protobuf.", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
getParserForTypeMethod = protobufMessageLiteClass.getDeclaredMethod("getParserForType");
|
||||||
|
newBuilderForTypeMethod = protobufMessageLiteClass.getDeclaredMethod("newBuilderForType");
|
||||||
|
} catch (NoSuchMethodException e) {
|
||||||
|
// If the method is not found, we are in trouble. Abort.
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
protobufMessageLiteClass.getDeclaredMethod("getParserForType");
|
||||||
|
hasParser = true;
|
||||||
|
} catch (Throwable var2) {
|
||||||
|
}
|
||||||
|
|
||||||
|
HAS_PARSER = hasParser;
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue