HBASE-13805 Use LimitInputStream in hbase-common instead of ProtobufUtil.LimitedInputStream. (Jingcheng)
This commit is contained in:
parent
a84e829e12
commit
b5641d8edf
|
@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.protobuf;
|
||||||
import static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME;
|
import static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME;
|
||||||
|
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
import java.io.FilterInputStream;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.lang.reflect.Constructor;
|
import java.lang.reflect.Constructor;
|
||||||
|
@ -39,7 +38,6 @@ import java.util.Map.Entry;
|
||||||
import java.util.NavigableSet;
|
import java.util.NavigableSet;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import com.google.protobuf.*;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
|
@ -70,6 +68,7 @@ import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
|
||||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||||
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
|
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
|
||||||
import org.apache.hadoop.hbase.filter.Filter;
|
import org.apache.hadoop.hbase.filter.Filter;
|
||||||
|
import org.apache.hadoop.hbase.io.LimitInputStream;
|
||||||
import org.apache.hadoop.hbase.io.TimeRange;
|
import org.apache.hadoop.hbase.io.TimeRange;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
|
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
|
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
|
||||||
|
@ -125,12 +124,12 @@ import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
|
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
|
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
|
import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
|
||||||
|
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
|
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
|
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
|
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
|
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor.EventType;
|
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor.EventType;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
|
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
|
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
|
||||||
import org.apache.hadoop.hbase.quotas.QuotaScope;
|
import org.apache.hadoop.hbase.quotas.QuotaScope;
|
||||||
import org.apache.hadoop.hbase.quotas.QuotaType;
|
import org.apache.hadoop.hbase.quotas.QuotaType;
|
||||||
|
@ -157,6 +156,15 @@ import org.apache.hadoop.security.token.Token;
|
||||||
import com.google.common.collect.ArrayListMultimap;
|
import com.google.common.collect.ArrayListMultimap;
|
||||||
import com.google.common.collect.ListMultimap;
|
import com.google.common.collect.ListMultimap;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
|
import com.google.protobuf.ByteString;
|
||||||
|
import com.google.protobuf.CodedInputStream;
|
||||||
|
import com.google.protobuf.InvalidProtocolBufferException;
|
||||||
|
import com.google.protobuf.Message;
|
||||||
|
import com.google.protobuf.Parser;
|
||||||
|
import com.google.protobuf.RpcChannel;
|
||||||
|
import com.google.protobuf.Service;
|
||||||
|
import com.google.protobuf.ServiceException;
|
||||||
|
import com.google.protobuf.TextFormat;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Protobufs utility.
|
* Protobufs utility.
|
||||||
|
@ -3019,7 +3027,6 @@ public final class ProtobufUtil {
|
||||||
return desc.build();
|
return desc.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This version of protobuf's mergeDelimitedFrom avoid the hard-coded 64MB limit for decoding
|
* This version of protobuf's mergeDelimitedFrom avoid the hard-coded 64MB limit for decoding
|
||||||
* buffers
|
* buffers
|
||||||
|
@ -3035,7 +3042,7 @@ public final class ProtobufUtil {
|
||||||
// bail out. (was return false;)
|
// bail out. (was return false;)
|
||||||
} else {
|
} else {
|
||||||
final int size = CodedInputStream.readRawVarint32(firstByte, in);
|
final int size = CodedInputStream.readRawVarint32(firstByte, in);
|
||||||
final InputStream limitedInput = new LimitedInputStream(in, size);
|
final InputStream limitedInput = new LimitInputStream(in, size);
|
||||||
final CodedInputStream codedInput = CodedInputStream.newInstance(limitedInput);
|
final CodedInputStream codedInput = CodedInputStream.newInstance(limitedInput);
|
||||||
codedInput.setSizeLimit(size);
|
codedInput.setSizeLimit(size);
|
||||||
builder.mergeFrom(codedInput);
|
builder.mergeFrom(codedInput);
|
||||||
|
@ -3043,63 +3050,6 @@ public final class ProtobufUtil {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* This is cut and paste from protobuf's package private AbstractMessageLite.
|
|
||||||
*
|
|
||||||
* An InputStream implementations which reads from some other InputStream
|
|
||||||
* but is limited to a particular number of bytes. Used by
|
|
||||||
* mergeDelimitedFrom(). This is intentionally package-private so that
|
|
||||||
* UnknownFieldSet can share it.
|
|
||||||
*/
|
|
||||||
static final class LimitedInputStream extends FilterInputStream {
|
|
||||||
private int limit;
|
|
||||||
|
|
||||||
LimitedInputStream(InputStream in, int limit) {
|
|
||||||
super(in);
|
|
||||||
this.limit = limit;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int available() throws IOException {
|
|
||||||
return Math.min(super.available(), limit);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int read() throws IOException {
|
|
||||||
if (limit <= 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
final int result = super.read();
|
|
||||||
if (result >= 0) {
|
|
||||||
--limit;
|
|
||||||
}
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int read(final byte[] b, final int off, int len)
|
|
||||||
throws IOException {
|
|
||||||
if (limit <= 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
len = Math.min(len, limit);
|
|
||||||
final int result = super.read(b, off, len);
|
|
||||||
if (result >= 0) {
|
|
||||||
limit -= result;
|
|
||||||
}
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public long skip(final long n) throws IOException {
|
|
||||||
final long result = super.skip(Math.min(n, limit));
|
|
||||||
if (result >= 0) {
|
|
||||||
limit -= result;
|
|
||||||
}
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static ReplicationLoadSink toReplicationLoadSink(
|
public static ReplicationLoadSink toReplicationLoadSink(
|
||||||
ClusterStatusProtos.ReplicationLoadSink cls) {
|
ClusterStatusProtos.ReplicationLoadSink cls) {
|
||||||
return new ReplicationLoadSink(cls.getAgeOfLastAppliedOp(), cls.getTimeStampsOfLastAppliedOp());
|
return new ReplicationLoadSink(cls.getAgeOfLastAppliedOp(), cls.getTimeStampsOfLastAppliedOp());
|
||||||
|
|
Loading…
Reference in New Issue