HBASE-14598 ByteBufferOutputStream grows its HeapByteBuffer beyond JVM limitations (Ian Friedman)
This commit is contained in:
parent
ba3d474f8a
commit
940e5404df
|
@ -22,6 +22,7 @@ import java.io.DataInput;
|
|||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.BufferOverflowException;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
|
@ -31,6 +32,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|||
import org.apache.hadoop.conf.Configurable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.CellScanner;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HBaseIOException;
|
||||
import org.apache.hadoop.hbase.codec.Codec;
|
||||
import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
|
||||
|
@ -154,6 +156,8 @@ public class IPCUtil {
|
|||
// If no cells, don't mess around. Just return null (could be a bunch of existence checking
|
||||
// gets or something -- stuff that does not return a cell).
|
||||
if (count == 0) return null;
|
||||
} catch (BufferOverflowException e) {
|
||||
throw new DoNotRetryIOException(e);
|
||||
} finally {
|
||||
os.close();
|
||||
if (poolCompressor != null) CodecPool.returnCompressor(poolCompressor);
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.io;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.BufferOverflowException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.ByteOrder;
|
||||
import java.nio.channels.Channels;
|
||||
|
@ -37,6 +38,10 @@ import org.apache.hadoop.hbase.util.Bytes;
|
|||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
public class ByteBufferOutputStream extends OutputStream {
|
||||
|
||||
// Borrowed from openJDK:
|
||||
// http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/8-b132/java/util/ArrayList.java#221
|
||||
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
|
||||
|
||||
protected ByteBuffer buf;
|
||||
|
||||
|
@ -69,6 +74,9 @@ public class ByteBufferOutputStream extends OutputStream {
|
|||
}
|
||||
|
||||
private static ByteBuffer allocate(final int capacity, final boolean useDirectByteBuffer) {
|
||||
if (capacity > MAX_ARRAY_SIZE) { // avoid OutOfMemoryError
|
||||
throw new BufferOverflowException();
|
||||
}
|
||||
return useDirectByteBuffer? ByteBuffer.allocateDirect(capacity): ByteBuffer.allocate(capacity);
|
||||
}
|
||||
|
||||
|
@ -82,13 +90,17 @@ public class ByteBufferOutputStream extends OutputStream {
|
|||
}
|
||||
|
||||
private void checkSizeAndGrow(int extra) {
|
||||
if ( (buf.position() + extra) > buf.limit()) {
|
||||
// size calculation is complex, because we could overflow negative,
|
||||
// and/or not allocate enough space. this fixes that.
|
||||
int newSize = (int)Math.min((((long)buf.capacity()) * 2),
|
||||
(long)(Integer.MAX_VALUE));
|
||||
newSize = Math.max(newSize, buf.position() + extra);
|
||||
ByteBuffer newBuf = allocate(newSize, buf.isDirect());
|
||||
long capacityNeeded = buf.position() + (long) extra;
|
||||
if (capacityNeeded > buf.limit()) {
|
||||
// guarantee it's possible to fit
|
||||
if (capacityNeeded > MAX_ARRAY_SIZE) {
|
||||
throw new BufferOverflowException();
|
||||
}
|
||||
// double until hit the cap
|
||||
long nextCapacity = Math.min(buf.capacity() * 2L, MAX_ARRAY_SIZE);
|
||||
// but make sure there is enough if twice the existing capacity is still too small
|
||||
nextCapacity = Math.max(nextCapacity, capacityNeeded);
|
||||
ByteBuffer newBuf = allocate((int) nextCapacity, buf.isDirect());
|
||||
buf.flip();
|
||||
ByteBufferUtils.copyFromBufferToBuffer(buf, newBuf);
|
||||
buf = newBuf;
|
||||
|
|
Loading…
Reference in New Issue