HBASE-23833. The relocated hadoop-thirdparty protobuf breaks HBase asyncwal (#1301)
* Use Reflection to access shaded Hadoop protobuf classes.
(cherry picked from commit a321e536989083ca3620bf2c53f12c07740bf5b0)
* Update to improve the code:
1. Added license.
2. Added more comments.
3. Wrap byte array instead of copy to make a ByteString.
4. Moved all reflection instantiation to static class loading time.
* Use LiteralByteString to wrap byte array instead of copying it.
Signed-off-by: Duo Zhang <zhangduo@apache.org>
Signed-off-by: stack <stack@apache.org>
(cherry picked from commit 72727ff9be
)
This commit is contained in:
parent
3f43b9e349
commit
ae3de38bed
|
@ -69,7 +69,6 @@ import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler.Sharable;
|
|||
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
|
||||
import org.apache.hbase.thirdparty.io.netty.channel.ChannelId;
|
||||
import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
|
||||
import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufDecoder;
|
||||
import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
|
||||
import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
|
||||
import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
|
||||
|
|
|
@ -99,7 +99,6 @@ import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
|
|||
import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
|
||||
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
|
||||
import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
|
||||
import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufDecoder;
|
||||
import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
|
||||
import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
|
||||
import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
|
||||
|
|
|
@ -20,9 +20,9 @@ package org.apache.hadoop.hbase.io.asyncfs;
|
|||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
|
||||
import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.CodedOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.lang.reflect.Field;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
|
@ -93,7 +93,6 @@ import org.apache.hbase.thirdparty.io.netty.channel.ChannelPromise;
|
|||
import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
|
||||
import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
|
||||
import org.apache.hbase.thirdparty.io.netty.handler.codec.MessageToByteEncoder;
|
||||
import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufDecoder;
|
||||
import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
|
||||
import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
|
||||
import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
|
||||
|
@ -355,15 +354,93 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
|
|||
return Collections.singletonList(new CipherOption(CipherSuite.AES_CTR_NOPADDING));
|
||||
}
|
||||
|
||||
/**
|
||||
* The asyncfs subsystem emulates a HDFS client by sending protobuf messages via netty.
|
||||
* After Hadoop 3.3.0, the protobuf classes are relocated to org.apache.hadoop.thirdparty.protobuf.*.
|
||||
* Use Reflection to check which ones to use.
|
||||
*/
|
||||
private static class BuilderPayloadSetter {
|
||||
private static Method setPayloadMethod;
|
||||
private static Constructor<?> constructor;
|
||||
|
||||
/**
|
||||
* Create a ByteString from byte array without copying (wrap), and then set it as the payload
|
||||
* for the builder.
|
||||
*
|
||||
* @param builder builder for HDFS DataTransferEncryptorMessage.
|
||||
* @param payload byte array of payload.
|
||||
* @throws IOException
|
||||
*/
|
||||
static void wrapAndSetPayload(DataTransferEncryptorMessageProto.Builder builder, byte[] payload)
|
||||
throws IOException {
|
||||
Object byteStringObject;
|
||||
try {
|
||||
// byteStringObject = new LiteralByteString(payload);
|
||||
byteStringObject = constructor.newInstance(payload);
|
||||
// builder.setPayload(byteStringObject);
|
||||
setPayloadMethod.invoke(builder, constructor.getDeclaringClass().cast(byteStringObject));
|
||||
} catch (IllegalAccessException | InstantiationException e) {
|
||||
throw new RuntimeException(e);
|
||||
|
||||
} catch (InvocationTargetException e) {
|
||||
Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
|
||||
throw new RuntimeException(e.getTargetException());
|
||||
}
|
||||
}
|
||||
|
||||
static {
|
||||
Class<?> builderClass = DataTransferEncryptorMessageProto.Builder.class;
|
||||
|
||||
// Try the unrelocated ByteString
|
||||
Class<?> byteStringClass = com.google.protobuf.ByteString.class;
|
||||
try {
|
||||
// See if it can load the relocated ByteString, which comes from hadoop-thirdparty.
|
||||
byteStringClass = Class.forName("org.apache.hadoop.thirdparty.protobuf.ByteString");
|
||||
LOG.debug("Found relocated ByteString class from hadoop-thirdparty." +
|
||||
" Assuming this is Hadoop 3.3.0+.");
|
||||
} catch (ClassNotFoundException e) {
|
||||
LOG.debug("Did not find relocated ByteString class from hadoop-thirdparty." +
|
||||
" Assuming this is below Hadoop 3.3.0", e);
|
||||
}
|
||||
|
||||
// LiteralByteString is a package private class in protobuf. Make it accessible.
|
||||
Class<?> literalByteStringClass;
|
||||
try {
|
||||
literalByteStringClass = Class.forName(
|
||||
"org.apache.hadoop.thirdparty.protobuf.ByteString$LiteralByteString");
|
||||
LOG.debug("Shaded LiteralByteString from hadoop-thirdparty is found.");
|
||||
} catch (ClassNotFoundException e) {
|
||||
try {
|
||||
literalByteStringClass = Class.forName("com.google.protobuf.LiteralByteString");
|
||||
LOG.debug("com.google.protobuf.LiteralByteString found.");
|
||||
} catch (ClassNotFoundException ex) {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
constructor = literalByteStringClass.getDeclaredConstructor(byte[].class);
|
||||
constructor.setAccessible(true);
|
||||
} catch (NoSuchMethodException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
try {
|
||||
setPayloadMethod = builderClass.getMethod("setPayload", byteStringClass);
|
||||
} catch (NoSuchMethodException e) {
|
||||
// if either method is not found, we are in big trouble. Abort.
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload,
|
||||
List<CipherOption> options) throws IOException {
|
||||
DataTransferEncryptorMessageProto.Builder builder =
|
||||
DataTransferEncryptorMessageProto.newBuilder();
|
||||
builder.setStatus(DataTransferEncryptorStatus.SUCCESS);
|
||||
if (payload != null) {
|
||||
// Was ByteStringer; fix w/o using ByteStringer. Its in hbase-protocol
|
||||
// and we want to keep that out of hbase-server.
|
||||
builder.setPayload(ByteString.copyFrom(payload));
|
||||
BuilderPayloadSetter.wrapAndSetPayload(builder, payload);
|
||||
}
|
||||
if (options != null) {
|
||||
builder.addAllCipherOption(PBHelperClient.convertCipherOptions(options));
|
||||
|
|
|
@ -0,0 +1,148 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.io.asyncfs;
|
||||
|
||||
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
|
||||
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufUtil;
|
||||
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
|
||||
import org.apache.hbase.thirdparty.io.netty.handler.codec.MessageToMessageDecoder;
|
||||
import org.apache.hbase.thirdparty.io.netty.util.internal.ObjectUtil;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Modified based on io.netty.handler.codec.protobuf.ProtobufDecoder.
|
||||
* The Netty's ProtobufDecode supports unshaded protobuf messages (com.google.protobuf).
|
||||
*
|
||||
* Hadoop 3.3.0 and above relocates protobuf classes to a shaded jar (hadoop-thirdparty), and
|
||||
* so we must use reflection to detect which one (relocated or not) to use.
|
||||
*
|
||||
* Do not use this to process HBase's shaded protobuf messages. This is meant to process the
|
||||
* protobuf messages in HDFS for the asyncfs use case.
|
||||
* */
|
||||
@InterfaceAudience.Private
|
||||
public class ProtobufDecoder extends MessageToMessageDecoder<ByteBuf> {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(ProtobufDecoder.class);
|
||||
|
||||
private static Class<?> protobufMessageLiteClass = null;
|
||||
private static Class<?> protobufMessageLiteBuilderClass = null;
|
||||
|
||||
private static final boolean HAS_PARSER;
|
||||
|
||||
private static Method getParserForTypeMethod;
|
||||
private static Method newBuilderForTypeMethod;
|
||||
|
||||
private Method parseFromMethod;
|
||||
private Method mergeFromMethod;
|
||||
private Method buildMethod;
|
||||
|
||||
private Object parser;
|
||||
private Object builder;
|
||||
|
||||
|
||||
public ProtobufDecoder(Object prototype) {
|
||||
try {
|
||||
Method getDefaultInstanceForTypeMethod = protobufMessageLiteClass.getMethod(
|
||||
"getDefaultInstanceForType");
|
||||
Object prototype1 = getDefaultInstanceForTypeMethod
|
||||
.invoke(ObjectUtil.checkNotNull(prototype, "prototype"));
|
||||
|
||||
// parser = prototype.getParserForType()
|
||||
parser = getParserForTypeMethod.invoke(prototype1);
|
||||
parseFromMethod = parser.getClass().getMethod(
|
||||
"parseFrom", byte[].class, int.class, int.class);
|
||||
|
||||
// builder = prototype.newBuilderForType();
|
||||
builder = newBuilderForTypeMethod.invoke(prototype1);
|
||||
mergeFromMethod = builder.getClass().getMethod(
|
||||
"mergeFrom", byte[].class, int.class, int.class);
|
||||
|
||||
// All protobuf message builders inherits from MessageLite.Builder
|
||||
buildMethod = protobufMessageLiteBuilderClass.getDeclaredMethod("build");
|
||||
|
||||
} catch (IllegalAccessException | NoSuchMethodException e) {
|
||||
throw new RuntimeException(e);
|
||||
} catch (InvocationTargetException e) {
|
||||
throw new RuntimeException(e.getTargetException());
|
||||
}
|
||||
}
|
||||
|
||||
protected void decode(
|
||||
ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
|
||||
int length = msg.readableBytes();
|
||||
byte[] array;
|
||||
int offset;
|
||||
if (msg.hasArray()) {
|
||||
array = msg.array();
|
||||
offset = msg.arrayOffset() + msg.readerIndex();
|
||||
} else {
|
||||
array = ByteBufUtil.getBytes(msg, msg.readerIndex(), length, false);
|
||||
offset = 0;
|
||||
}
|
||||
|
||||
Object addObj;
|
||||
if (HAS_PARSER) {
|
||||
// addObj = parser.parseFrom(array, offset, length);
|
||||
addObj = parseFromMethod.invoke(parser, array, offset, length);
|
||||
} else {
|
||||
// addObj = builder.mergeFrom(array, offset, length).build();
|
||||
Object builderObj = mergeFromMethod.invoke(builder, array, offset, length);
|
||||
addObj = buildMethod.invoke(builderObj);
|
||||
}
|
||||
out.add(addObj);
|
||||
}
|
||||
|
||||
static {
|
||||
boolean hasParser = false;
|
||||
|
||||
// These are the protobuf classes coming from Hadoop. Not the one from hbase-shaded-protobuf
|
||||
protobufMessageLiteClass = com.google.protobuf.MessageLite.class;
|
||||
protobufMessageLiteBuilderClass = com.google.protobuf.MessageLite.Builder.class;
|
||||
|
||||
try {
|
||||
protobufMessageLiteClass = Class.forName("org.apache.hadoop.thirdparty.protobuf.MessageLite");
|
||||
protobufMessageLiteBuilderClass = Class.forName(
|
||||
"org.apache.hadoop.thirdparty.protobuf.MessageLite.Builder");
|
||||
LOG.debug("Hadoop 3.3 and above shades protobuf.");
|
||||
} catch (ClassNotFoundException e) {
|
||||
LOG.debug("Hadoop 3.2 and below use unshaded protobuf.", e);
|
||||
}
|
||||
|
||||
try {
|
||||
getParserForTypeMethod = protobufMessageLiteClass.getDeclaredMethod("getParserForType");
|
||||
newBuilderForTypeMethod = protobufMessageLiteClass.getDeclaredMethod("newBuilderForType");
|
||||
} catch (NoSuchMethodException e) {
|
||||
// If the method is not found, we are in trouble. Abort.
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
try {
|
||||
protobufMessageLiteClass.getDeclaredMethod("getParserForType");
|
||||
hasParser = true;
|
||||
} catch (Throwable var2) {
|
||||
}
|
||||
|
||||
HAS_PARSER = hasParser;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue