HBASE-14598 ByteBufferOutputStream grows its HeapByteBuffer beyond JVM limitations (Ian Friedman)

This commit is contained in:
stack 2015-10-14 13:13:01 -07:00
parent 0818df79d3
commit 11066d0459
2 changed files with 23 additions and 7 deletions

View File

@ -22,6 +22,7 @@ import java.io.DataInput;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import org.apache.commons.io.IOUtils;
@ -31,6 +32,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
@ -154,6 +156,8 @@ public class IPCUtil {
// If no cells, don't mess around. Just return null (could be a bunch of existence checking
// gets or something -- stuff that does not return a cell).
if (count == 0) return null;
} catch (BufferOverflowException e) {
throw new DoNotRetryIOException(e);
} finally {
os.close();
if (poolCompressor != null) CodecPool.returnCompressor(poolCompressor);

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.io;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
@ -35,6 +36,10 @@ import org.apache.hadoop.hbase.util.Bytes;
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class ByteBufferOutputStream extends OutputStream {
// Borrowed from openJDK:
// http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/8-b132/java/util/ArrayList.java#221
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
protected ByteBuffer buf;
@ -66,6 +71,9 @@ public class ByteBufferOutputStream extends OutputStream {
}
private static ByteBuffer allocate(final int capacity, final boolean useDirectByteBuffer) {
if (capacity > MAX_ARRAY_SIZE) { // avoid OutOfMemoryError
throw new BufferOverflowException();
}
return useDirectByteBuffer? ByteBuffer.allocateDirect(capacity): ByteBuffer.allocate(capacity);
}
@ -79,13 +87,17 @@ public class ByteBufferOutputStream extends OutputStream {
}
private void checkSizeAndGrow(int extra) {
if ( (buf.position() + extra) > buf.limit()) {
// size calculation is complex, because we could overflow negative,
// and/or not allocate enough space. this fixes that.
int newSize = (int)Math.min((((long)buf.capacity()) * 2),
(long)(Integer.MAX_VALUE));
newSize = Math.max(newSize, buf.position() + extra);
ByteBuffer newBuf = allocate(newSize, buf.isDirect());
long capacityNeeded = buf.position() + (long) extra;
if (capacityNeeded > buf.limit()) {
// guarantee it's possible to fit
if (capacityNeeded > MAX_ARRAY_SIZE) {
throw new BufferOverflowException();
}
// double until hit the cap
long nextCapacity = Math.min(buf.capacity() * 2L, MAX_ARRAY_SIZE);
// but make sure there is enough if twice the existing capacity is still too small
nextCapacity = Math.max(nextCapacity, capacityNeeded);
ByteBuffer newBuf = allocate((int) nextCapacity, buf.isDirect());
buf.flip();
newBuf.put(buf);
buf = newBuf;