diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java index d71bf5ec54a..43028c60ddc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java @@ -26,6 +26,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos; import org.apache.hadoop.ipc.RemoteException; @@ -92,7 +93,7 @@ public class AsyncServerResponseHandler extends ChannelInboundHandlerAdapter { // Call may be null because it may have timedout and been cleaned up on this side already if (call.responseDefaultType != null) { Message.Builder builder = call.responseDefaultType.newBuilderForType(); - builder.mergeDelimitedFrom(in); + ProtobufUtil.mergeDelimitedFrom(builder, in); value = builder.build(); } CellScanner cellBlockScanner = null; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java index 9400a2cd9d1..dad51640cf3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader; @@ -950,7 +951,7 @@ public class RpcClientImpl extends AbstractRpcClient { Message value = null; if (call.responseDefaultType != null) { Builder builder = call.responseDefaultType.newBuilderForType(); - builder.mergeDelimitedFrom(in); + ProtobufUtil.mergeDelimitedFrom(builder, in); value = builder.build(); } CellScanner cellBlockScanner = null; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index caae1bbbaec..7bb9de1335c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -21,7 +21,9 @@ package org.apache.hadoop.hbase.protobuf; import static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME; import java.io.ByteArrayOutputStream; +import java.io.FilterInputStream; import java.io.IOException; +import java.io.InputStream; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @@ -37,6 +39,7 @@ import java.util.Map.Entry; import java.util.NavigableSet; import java.util.concurrent.TimeUnit; +import com.google.protobuf.*; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -149,14 +152,6 @@ import org.apache.hadoop.security.token.Token; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ListMultimap; import com.google.common.collect.Lists; -import com.google.protobuf.ByteString; -import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.Message; -import com.google.protobuf.Parser; -import com.google.protobuf.RpcChannel; -import com.google.protobuf.Service; -import com.google.protobuf.ServiceException; -import com.google.protobuf.TextFormat; /** * Protobufs utility. @@ -2994,4 +2989,86 @@ public final class ProtobufUtil { return desc.build(); } + + + + /** + * This version of protobuf's mergeDelimitedFrom avoid the hard-coded 64MB limit for decoding + * buffers + * @param builder current message builder + * @param in Inputsream with delimited protobuf data + * @throws IOException + */ + public static void mergeDelimitedFrom(Message.Builder builder, InputStream in) throws IOException { + // This used to be builder.mergeDelimitedFrom(in); + // but is replaced to allow us to bump the protobuf size limit. + final int firstByte = in.read(); + if (firstByte == -1) { + // bail out. (was return false;) + } else { + final int size = CodedInputStream.readRawVarint32(firstByte, in); + final InputStream limitedInput = new LimitedInputStream(in, size); + final CodedInputStream codedInput = CodedInputStream.newInstance(limitedInput); + codedInput.setSizeLimit(size); + builder.mergeFrom(codedInput); + codedInput.checkLastTagWas(0); + } + } + + /** + * This is cut and paste from protobuf's package private AbstractMessageLite. + * + * An InputStream implementations which reads from some other InputStream + * but is limited to a particular number of bytes. Used by + * mergeDelimitedFrom(). This is intentionally package-private so that + * UnknownFieldSet can share it. + */ + static final class LimitedInputStream extends FilterInputStream { + private int limit; + + LimitedInputStream(InputStream in, int limit) { + super(in); + this.limit = limit; + } + + @Override + public int available() throws IOException { + return Math.min(super.available(), limit); + } + + @Override + public int read() throws IOException { + if (limit <= 0) { + return -1; + } + final int result = super.read(); + if (result >= 0) { + --limit; + } + return result; + } + + @Override + public int read(final byte[] b, final int off, int len) + throws IOException { + if (limit <= 0) { + return -1; + } + len = Math.min(len, limit); + final int result = super.read(b, off, len); + if (result >= 0) { + limit -= result; + } + return result; + } + + @Override + public long skip(final long n) throws IOException { + final long result = super.skip(Math.min(n, limit)); + if (result >= 0) { + limit -= result; + } + return result; + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java index 1112b12704a..27a0b062a5f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java @@ -31,13 +31,9 @@ import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.mob.MobConstants; import org.apache.hadoop.hbase.mob.MobUtils; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -70,6 +66,7 @@ public class TestMobStoreScanner { public static void setUpBeforeClass() throws Exception { TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0); TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true); + TEST_UTIL.getConfiguration().setInt("hbase.client.keyvalue.maxsize", 100*1024*1024); TEST_UTIL.startMiniCluster(1); } @@ -136,6 +133,26 @@ public class TestMobStoreScanner { testGetFromArchive(true); } + @Test(timeout=60000) + public void testGetMassive() throws Exception { + String TN = "testGetMassive"; + setUp(defaultThreshold, TN); + + // Put some data 5 10, 15, 20 mb ok (this would be right below protobuf default max size of 64MB. + // 25, 30, 40 fail. these is above protobuf max size of 64MB + byte[] bigValue = new byte[25*1024*1024]; + + Put put = new Put(row1); + put.add(family, qf1, bigValue); + put.add(family, qf2, bigValue); + put.add(family, qf3, bigValue); + table.put(put); + + Get g = new Get(row1); + Result r = table.get(g); + // should not have blown up. + } + public void testGetFromFiles(boolean reversed) throws Exception { String TN = "testGetFromFiles" + reversed; setUp(defaultThreshold, TN);