From e5f7030dedb12e257c1095326b7e3b5e49010c5f Mon Sep 17 00:00:00 2001 From: Beluga Behr Date: Fri, 5 Jan 2018 16:51:33 -0600 Subject: [PATCH] Revert "HBASE-19651 Remove LimitInputStream" Revert to put Beluga in place as author. This reverts commit 032fdc53de7bc1c59b2a44dda65d9fdf38eb9e3c. --- NOTICE.txt | 1 + .../hbase/shaded/protobuf/ProtobufUtil.java | 4 +- .../main/appended-resources/META-INF/NOTICE | 7 ++ .../hadoop/hbase/io/LimitInputStream.java | 105 ++++++++++++++++++ .../regionserver/wal/ProtobufLogReader.java | 11 +- 5 files changed, 120 insertions(+), 8 deletions(-) create mode 100644 hbase-common/src/main/appended-resources/META-INF/NOTICE create mode 100644 hbase-common/src/main/java/org/apache/hadoop/hbase/io/LimitInputStream.java diff --git a/NOTICE.txt b/NOTICE.txt index 9c238bed136..79598575acf 100755 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -39,6 +39,7 @@ Licensed under the Apache License v2.0 as a part of the Bootstrap project. -- This product includes portions of the Guava project v14 and v21, specifically +'hbase-common/src/main/java/org/apache/hadoop/hbase/io/LimitInputStream.java' 'hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java' 'hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java' diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index f90d9dddc6a..0706129f7ed 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -85,6 +85,7 @@ import org.apache.hadoop.hbase.client.security.SecurityCapability; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.io.LimitInputStream; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.protobuf.ProtobufMagic; import org.apache.hadoop.hbase.protobuf.ProtobufMessageConverter; @@ -105,7 +106,6 @@ import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.hadoop.ipc.RemoteException; import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams; import org.apache.hbase.thirdparty.com.google.gson.JsonArray; import org.apache.hbase.thirdparty.com.google.gson.JsonElement; import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; @@ -2609,7 +2609,7 @@ public final class ProtobufUtil { final int firstByte = in.read(); if (firstByte != -1) { final int size = CodedInputStream.readRawVarint32(firstByte, in); - final InputStream limitedInput = ByteStreams.limit(in, size); + final InputStream limitedInput = new LimitInputStream(in, size); final CodedInputStream codedInput = CodedInputStream.newInstance(limitedInput); codedInput.setSizeLimit(size); builder.mergeFrom(codedInput); diff --git a/hbase-common/src/main/appended-resources/META-INF/NOTICE b/hbase-common/src/main/appended-resources/META-INF/NOTICE new file mode 100644 index 00000000000..9176ececdc3 --- /dev/null +++ b/hbase-common/src/main/appended-resources/META-INF/NOTICE @@ -0,0 +1,7 @@ +-- +This product includes portions of the Guava project v14, specifically +'hbase-common/src/main/java/org/apache/hadoop/hbase/io/LimitInputStream.java' + +Copyright (C) 2007 The Guava Authors + +Licensed under the Apache License, Version 2.0 diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/LimitInputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/LimitInputStream.java new file mode 100644 index 00000000000..6eb710a13eb --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/LimitInputStream.java @@ -0,0 +1,105 @@ +/* + * Copyright (C) 2007 The Guava Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.hbase.io; + +import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument; +import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull; + +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; + +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Copied from guava source code v15 (LimitedInputStream) + * Guava deprecated LimitInputStream in v14 and removed it in v15. Copying this class here + * allows to be compatible with guava 11 to 15+. + */ +@InterfaceAudience.Private +public final class LimitInputStream extends FilterInputStream { + private long left; + private long mark = -1; + + public LimitInputStream(InputStream in, long limit) { + super(in); + checkNotNull(in); + checkArgument(limit >= 0, "limit must be non-negative"); + left = limit; + } + + @Override + public int available() throws IOException { + return (int) Math.min(in.available(), left); + } + + // it's okay to mark even if mark isn't supported, as reset won't work + @Override + public synchronized void mark(int readLimit) { + in.mark(readLimit); + mark = left; + } + + @Override + public int read() throws IOException { + if (left == 0) { + return -1; + } + + int result = in.read(); + if (result != -1) { + --left; + } + return result; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + if (left == 0) { + return -1; + } + + len = (int) Math.min(len, left); + int result = in.read(b, off, len); + if (result != -1) { + left -= result; + } + return result; + } + + @Override + public synchronized void reset() throws IOException { + if (!in.markSupported()) { + throw new IOException("Mark not supported"); + } + if (mark == -1) { + throw new IOException("Mark not set"); + } + + in.reset(); + left = mark; + } + + @Override + public long skip(long n) throws IOException { + n = Math.min(n, left); + long skipped = in.skip(n); + left -= skipped; + return skipped; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java index ebb6079b088..cba2c61959b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java @@ -26,11 +26,15 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.hbase.codec.Codec; +import org.apache.hadoop.hbase.io.LimitInputStream; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; @@ -39,12 +43,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WAL.Entry; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams; import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream; import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; @@ -351,7 +350,7 @@ public class ProtobufLogReader extends ReaderBase { "inputStream.available()= " + this.inputStream.available() + ", " + "entry size= " + size + " at offset = " + this.inputStream.getPos()); } - ProtobufUtil.mergeFrom(builder, ByteStreams.limit(this.inputStream, size), + ProtobufUtil.mergeFrom(builder, new LimitInputStream(this.inputStream, size), (int)size); } catch (InvalidProtocolBufferException ipbe) { throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition=" +