From 39f896ee54f1af43eb7ac17c84f16c55a2c93c10 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Thu, 13 Oct 2022 12:06:12 +0100 Subject: [PATCH] HADOOP-18487. protobuf 2.5 use extracted from RpcWritable * reflection + shortcut to look for class instanceof * ProtobufWrapperLegacy pulled out for better isolation Change-Id: I6f67841d9649648a733f7d89835957faffbfd520 --- .../hadoop/ipc/ProtobufWrapperLegacy.java | 122 ++++++++++++++++++ .../org/apache/hadoop/ipc/RpcWritable.java | 49 +------ .../org/apache/hadoop/ipc/package-info.java | 6 +- 3 files changed, 131 insertions(+), 46 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufWrapperLegacy.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufWrapperLegacy.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufWrapperLegacy.java new file mode 100644 index 00000000000..e00b1a80055 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufWrapperLegacy.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ipc; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.util.Preconditions; + +/** + * A RpcWritable wrapper for unshaded protobuf messages. + * This class isolates unshaded protobuf classes from + * the rest of the RPC codebase, so it can operate without + * needing that on the classpath at runtime. + * The classes are needed at compile time; and if + * unshaded protobuf messages are to be marshalled, they + * will need to be on the classpath then. + * That is implicit: it is impossible to pass in a class + * which is a protobuf message unless that condition is met. + */ +@InterfaceAudience.Private +class ProtobufWrapperLegacy extends RpcWritable { + + private com.google.protobuf.Message message; + + /** + * Construct. + * The type of the parameter is Object so as to keep the casting internal + * to this class. + * @param message message to wrap. + * @throws IllegalArgumentException if the class is not a protobuf message. + */ + ProtobufWrapperLegacy(Object message) { + Preconditions.checkArgument(isUnshadedProtobufMessage(message), + "message class is not an unshaded protobuf message %s", + message.getClass()); + this.message = (com.google.protobuf.Message) message; + } + + com.google.protobuf.Message getMessage() { + return message; + } + + @Override + void writeTo(ResponseBuffer out) throws IOException { + int length = message.getSerializedSize(); + length += com.google.protobuf.CodedOutputStream. + computeUInt32SizeNoTag(length); + out.ensureCapacity(length); + message.writeDelimitedTo(out); + } + + @SuppressWarnings("unchecked") + @Override + T readFrom(ByteBuffer bb) throws IOException { + // using the parser with a byte[]-backed coded input stream is the + // most efficient way to deserialize a protobuf. it has a direct + // path to the PB ctor that doesn't create multi-layered streams + // that internally buffer. + com.google.protobuf.CodedInputStream cis = + com.google.protobuf.CodedInputStream.newInstance( + bb.array(), bb.position() + bb.arrayOffset(), bb.remaining()); + try { + cis.pushLimit(cis.readRawVarint32()); + message = message.getParserForType().parseFrom(cis); + cis.checkLastTagWas(0); + } finally { + // advance over the bytes read. + bb.position(bb.position() + cis.getTotalBytesRead()); + } + return (T) message; + } + + /** + * Has protobuf been looked for and is known as absent? + * Saves a check on every message. + */ + private static final AtomicBoolean PROTOBUF_KNOWN_NOT_FOUND = new AtomicBoolean(false); + + /** + * Is a message an unshaded protobuf message? + * @param payload payload + * @return true if protobuf.jar is on the classpath and the payload is a Message + */ + static boolean isUnshadedProtobufMessage(Object payload) { + if (PROTOBUF_KNOWN_NOT_FOUND.get()) { + // protobuf is known to be absent. fail fast without examining + // jars or generating exceptions. + return false; + } + // load the protobuf message class. + // if it does not load, then the payload is guaranteed not to be + // an unshaded protobuf message + // this relies on classloader caching for performance + try { + Class protobufMessageClazz = Class.forName("com.google.protobuf.Message"); + return protobufMessageClazz.isAssignableFrom(payload.getClass()); + } catch (ClassNotFoundException e) { + PROTOBUF_KNOWN_NOT_FOUND.set(true); + return false; + } + + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcWritable.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcWritable.java index f5f0d071f39..c2dc57573f2 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcWritable.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcWritable.java @@ -41,9 +41,11 @@ public abstract class RpcWritable implements Writable { if (o instanceof RpcWritable) { return (RpcWritable)o; } else if (o instanceof Message) { + // hadoop shaded protobuf return new ProtobufWrapper((Message)o); - } else if (o instanceof com.google.protobuf.Message) { - return new ProtobufWrapperLegacy((com.google.protobuf.Message) o); + } else if (ProtobufWrapperLegacy.isUnshadedProtobufMessage(o)) { + // unshaded protobuf + return new ProtobufWrapperLegacy( o); } else if (o instanceof Writable) { return new WritableWrapper((Writable)o); } @@ -134,49 +136,6 @@ public abstract class RpcWritable implements Writable { } } - // adapter for Protobufs. - static class ProtobufWrapperLegacy extends RpcWritable { - private com.google.protobuf.Message message; - - ProtobufWrapperLegacy(com.google.protobuf.Message message) { - this.message = message; - } - - com.google.protobuf.Message getMessage() { - return message; - } - - @Override - void writeTo(ResponseBuffer out) throws IOException { - int length = message.getSerializedSize(); - length += com.google.protobuf.CodedOutputStream. - computeUInt32SizeNoTag(length); - out.ensureCapacity(length); - message.writeDelimitedTo(out); - } - - @SuppressWarnings("unchecked") - @Override - T readFrom(ByteBuffer bb) throws IOException { - // using the parser with a byte[]-backed coded input stream is the - // most efficient way to deserialize a protobuf. it has a direct - // path to the PB ctor that doesn't create multi-layered streams - // that internally buffer. - com.google.protobuf.CodedInputStream cis = - com.google.protobuf.CodedInputStream.newInstance( - bb.array(), bb.position() + bb.arrayOffset(), bb.remaining()); - try { - cis.pushLimit(cis.readRawVarint32()); - message = message.getParserForType().parseFrom(cis); - cis.checkLastTagWas(0); - } finally { - // advance over the bytes read. - bb.position(bb.position() + cis.getTotalBytesRead()); - } - return (T)message; - } - } - /** * adapter to allow decoding of writables and protobufs from a byte buffer. */ diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/package-info.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/package-info.java index cb35e938483..f91fa23194c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/package-info.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/package-info.java @@ -18,8 +18,12 @@ /** * Tools to help define network clients and servers. + * Other ASF projects use this package, often with their own shaded/unshaded + * versions of protobuf messages. + * Changes to the API signatures will break things, especially changes to + * {@link org.apache.hadoop.ipc.RPC} and {@link org.apache.hadoop.ipc.RpcEngine}. */ -@InterfaceAudience.LimitedPrivate({"HBase", "HDFS", "MapReduce"}) +@InterfaceAudience.LimitedPrivate({"HBase", "HDFS", "MapReduce", "YARN", "Hive", "Ozone"}) @InterfaceStability.Evolving package org.apache.hadoop.ipc; import org.apache.hadoop.classification.InterfaceAudience;