diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/LimitInputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/LimitInputStream.java new file mode 100644 index 00000000000..8a21f9edda1 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/LimitInputStream.java @@ -0,0 +1,104 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.io; + +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * Copied from guava source code v15 (LimitedInputStream) + * Guava deprecated LimitInputStream in v14 and removed it in v15. Copying this class here + * allows to be compatible with guava 11 to 15+. + */ +public final class LimitInputStream extends FilterInputStream { + private long left; + private long mark = -1; + + public LimitInputStream(InputStream in, long limit) { + super(in); + checkNotNull(in); + checkArgument(limit >= 0, "limit must be non-negative"); + left = limit; + } + + @Override + public int available() throws IOException { + return (int) Math.min(in.available(), left); + } + + // it's okay to mark even if mark isn't supported, as reset won't work + @Override + public synchronized void mark(int readLimit) { + in.mark(readLimit); + mark = left; + } + + @Override + public int read() throws IOException { + if (left == 0) { + return -1; + } + + int result = in.read(); + if (result != -1) { + --left; + } + return result; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + if (left == 0) { + return -1; + } + + len = (int) Math.min(len, left); + int result = in.read(b, off, len); + if (result != -1) { + left -= result; + } + return result; + } + + @Override + public synchronized void reset() throws IOException { + if (!in.markSupported()) { + throw new IOException("Mark not supported"); + } + if (mark == -1) { + throw new IOException("Mark not set"); + } + + in.reset(); + left = mark; + } + + @Override + public long skip(long n) throws IOException { + n = Math.min(n, left); + long skipped = in.skip(n); + left -= skipped; + return skipped; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java index 5fe5a46f12e..1f808eb5c01 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java @@ -25,13 +25,13 @@ import java.io.InputStream; import java.nio.ByteBuffer; import java.util.Arrays; -import org.apache.commons.io.input.BoundedInputStream; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.hbase.codec.Codec; +import org.apache.hadoop.hbase.io.LimitInputStream; import org.apache.hadoop.hbase.protobuf.generated.WALProtos; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALHeader.Builder; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey; @@ -226,7 +226,7 @@ public class ProtobufLogReader extends ReaderBase { "inputStream.available()= " + this.inputStream.available() + ", " + "entry size= " + size); } - final InputStream limitedInput = new BoundedInputStream(this.inputStream, size); + final InputStream limitedInput = new LimitInputStream(this.inputStream, size); builder.mergeFrom(limitedInput); } catch (InvalidProtocolBufferException ipbe) { throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition=" +