HBASE-15788 Use Offheap ByteBuffers from BufferPool to read RPC requests.

This commit is contained in:
anoopsamjohn 2016-11-14 23:05:05 +05:30
parent 9250bf8091
commit c3685760f0
29 changed files with 699 additions and 124 deletions

View File

@ -29,6 +29,7 @@ import java.util.TreeMap;
import java.util.UUID; import java.util.UUID;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
@ -319,9 +320,7 @@ public class Put extends Mutation implements HeapSize, Comparable<Row> {
byte [] family = CellUtil.cloneFamily(kv); byte [] family = CellUtil.cloneFamily(kv);
List<Cell> list = getCellList(family); List<Cell> list = getCellList(family);
//Checking that the row of the kv is the same as the put //Checking that the row of the kv is the same as the put
int res = Bytes.compareTo(this.row, 0, row.length, if (!CellUtil.matchingRow(kv, this.row)) {
kv.getRowArray(), kv.getRowOffset(), kv.getRowLength());
if (res != 0) {
throw new WrongRowIOException("The row in " + kv.toString() + throw new WrongRowIOException("The row in " + kv.toString() +
" doesn't match the original one " + Bytes.toStringBinary(this.row)); " doesn't match the original one " + Bytes.toStringBinary(this.row));
} }

View File

@ -21,7 +21,9 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufOutputStream; import io.netty.buffer.ByteBufOutputStream;
import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.nio.BufferOverflowException; import java.nio.BufferOverflowException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -35,10 +37,13 @@ import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.io.ByteBuffInputStream;
import org.apache.hadoop.hbase.io.ByteBufferInputStream; import org.apache.hadoop.hbase.io.ByteBufferInputStream;
import org.apache.hadoop.hbase.io.ByteBufferListOutputStream; import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
import org.apache.hadoop.hbase.io.ByteBufferOutputStream; import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
import org.apache.hadoop.hbase.io.ByteBufferPool; import org.apache.hadoop.hbase.io.ByteBufferPool;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.io.compress.CodecPool; import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodec;
@ -238,15 +243,15 @@ class CellBlockBuilder {
public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor, public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
final byte[] cellBlock) throws IOException { final byte[] cellBlock) throws IOException {
// Use this method from Client side to create the CellScanner // Use this method from Client side to create the CellScanner
ByteBuffer cellBlockBuf = ByteBuffer.wrap(cellBlock);
if (compressor != null) { if (compressor != null) {
cellBlockBuf = decompress(compressor, cellBlockBuf); ByteBuffer cellBlockBuf = decompress(compressor, cellBlock);
return codec.getDecoder(new ByteBufferInputStream(cellBlockBuf));
} }
// Not making the Decoder over the ByteBuffer purposefully. The Decoder over the BB will // Not making the Decoder over the ByteBuffer purposefully. The Decoder over the BB will
// make Cells directly over the passed BB. This method is called at client side and we don't // make Cells directly over the passed BB. This method is called at client side and we don't
// want the Cells to share the same byte[] where the RPC response is being read. Caching of any // want the Cells to share the same byte[] where the RPC response is being read. Caching of any
// of the Cells at user's app level will make it not possible to GC the response byte[] // of the Cells at user's app level will make it not possible to GC the response byte[]
return codec.getDecoder(new ByteBufferInputStream(cellBlockBuf)); return codec.getDecoder(new ByteArrayInputStream(cellBlock));
} }
/** /**
@ -258,7 +263,7 @@ class CellBlockBuilder {
* @throws IOException if cell encoding fails * @throws IOException if cell encoding fails
*/ */
public CellScanner createCellScannerReusingBuffers(final Codec codec, public CellScanner createCellScannerReusingBuffers(final Codec codec,
final CompressionCodec compressor, ByteBuffer cellBlock) throws IOException { final CompressionCodec compressor, ByteBuff cellBlock) throws IOException {
// Use this method from HRS to create the CellScanner // Use this method from HRS to create the CellScanner
// If compressed, decompress it first before passing it on else we will leak compression // If compressed, decompress it first before passing it on else we will leak compression
// resources if the stream is not closed properly after we let it out. // resources if the stream is not closed properly after we let it out.
@ -268,27 +273,38 @@ class CellBlockBuilder {
return codec.getDecoder(cellBlock); return codec.getDecoder(cellBlock);
} }
private ByteBuffer decompress(CompressionCodec compressor, ByteBuffer cellBlock) private ByteBuffer decompress(CompressionCodec compressor, byte[] compressedCellBlock)
throws IOException { throws IOException {
ByteBuffer cellBlock = decompress(compressor, new ByteArrayInputStream(compressedCellBlock),
compressedCellBlock.length * this.cellBlockDecompressionMultiplier);
return cellBlock;
}
private ByteBuff decompress(CompressionCodec compressor, ByteBuff compressedCellBlock)
throws IOException {
ByteBuffer cellBlock = decompress(compressor, new ByteBuffInputStream(compressedCellBlock),
compressedCellBlock.remaining() * this.cellBlockDecompressionMultiplier);
return new SingleByteBuff(cellBlock);
}
private ByteBuffer decompress(CompressionCodec compressor, InputStream cellBlockStream,
int osInitialSize) throws IOException {
// GZIPCodec fails w/ NPE if no configuration. // GZIPCodec fails w/ NPE if no configuration.
if (compressor instanceof Configurable) { if (compressor instanceof Configurable) {
((Configurable) compressor).setConf(this.conf); ((Configurable) compressor).setConf(this.conf);
} }
Decompressor poolDecompressor = CodecPool.getDecompressor(compressor); Decompressor poolDecompressor = CodecPool.getDecompressor(compressor);
CompressionInputStream cis = compressor.createInputStream(new ByteBufferInputStream(cellBlock), CompressionInputStream cis = compressor.createInputStream(cellBlockStream, poolDecompressor);
poolDecompressor);
ByteBufferOutputStream bbos; ByteBufferOutputStream bbos;
try { try {
// TODO: This is ugly. The buffer will be resized on us if we guess wrong. // TODO: This is ugly. The buffer will be resized on us if we guess wrong.
// TODO: Reuse buffers. // TODO: Reuse buffers.
bbos = new ByteBufferOutputStream( bbos = new ByteBufferOutputStream(osInitialSize);
cellBlock.remaining() * this.cellBlockDecompressionMultiplier);
IOUtils.copy(cis, bbos); IOUtils.copy(cis, bbos);
bbos.close(); bbos.close();
cellBlock = bbos.getByteBuffer(); return bbos.getByteBuffer();
} finally { } finally {
CodecPool.returnDecompressor(poolDecompressor); CodecPool.returnDecompressor(poolDecompressor);
} }
return cellBlock;
} }
} }

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.codec.KeyValueCodec; import org.apache.hadoop.hbase.codec.KeyValueCodec;
import org.apache.hadoop.hbase.io.SizedCellScanner; import org.apache.hadoop.hbase.io.SizedCellScanner;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -78,7 +79,8 @@ public class TestCellBlockBuilder {
CellScanner cellScanner = sized ? getSizedCellScanner(cells) CellScanner cellScanner = sized ? getSizedCellScanner(cells)
: CellUtil.createCellScanner(Arrays.asList(cells).iterator()); : CellUtil.createCellScanner(Arrays.asList(cells).iterator());
ByteBuffer bb = builder.buildCellBlock(codec, compressor, cellScanner); ByteBuffer bb = builder.buildCellBlock(codec, compressor, cellScanner);
cellScanner = builder.createCellScannerReusingBuffers(codec, compressor, bb); cellScanner = builder.createCellScannerReusingBuffers(codec, compressor,
new SingleByteBuff(bb));
int i = 0; int i = 0;
while (cellScanner.advance()) { while (cellScanner.advance()) {
i++; i++;

View File

@ -36,10 +36,10 @@ public class OffheapKeyValue extends ByteBufferedCell implements ExtendedCell {
protected final ByteBuffer buf; protected final ByteBuffer buf;
protected final int offset; protected final int offset;
protected final int length; protected final int length;
protected final boolean hasTags;
private final short rowLen; private final short rowLen;
private final int keyLen; private final int keyLen;
private long seqId = 0; private long seqId = 0;
private final boolean hasTags;
// TODO : See if famLen can be cached or not? // TODO : See if famLen can be cached or not?
private static final int FIXED_OVERHEAD = ClassSize.OBJECT + ClassSize.REFERENCE private static final int FIXED_OVERHEAD = ClassSize.OBJECT + ClassSize.REFERENCE
@ -57,6 +57,18 @@ public class OffheapKeyValue extends ByteBufferedCell implements ExtendedCell {
this.seqId = seqId; this.seqId = seqId;
} }
public OffheapKeyValue(ByteBuffer buf, int offset, int length) {
assert buf.isDirect();
this.buf = buf;
this.offset = offset;
this.length = length;
rowLen = ByteBufferUtils.toShort(this.buf, this.offset + KeyValue.ROW_OFFSET);
keyLen = ByteBufferUtils.toInt(this.buf, this.offset);
int tagsLen = this.length
- (this.keyLen + getValueLength() + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE);
this.hasTags = tagsLen > 0;
}
@Override @Override
public byte[] getRowArray() { public byte[] getRowArray() {
return CellUtil.cloneRow(this); return CellUtil.cloneRow(this);
@ -265,16 +277,19 @@ public class OffheapKeyValue extends ByteBufferedCell implements ExtendedCell {
@Override @Override
public void setTimestamp(long ts) throws IOException { public void setTimestamp(long ts) throws IOException {
// This Cell implementation is not yet used in write path. ByteBufferUtils.copyFromArrayToBuffer(this.buf, this.getTimestampOffset(), Bytes.toBytes(ts), 0,
// TODO when doing HBASE-15179 Bytes.SIZEOF_LONG);
throw new UnsupportedOperationException(); }
private int getTimestampOffset() {
return this.offset + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE + this.keyLen
- KeyValue.TIMESTAMP_TYPE_SIZE;
} }
@Override @Override
public void setTimestamp(byte[] ts, int tsOffset) throws IOException { public void setTimestamp(byte[] ts, int tsOffset) throws IOException {
// This Cell implementation is not yet used in write path. ByteBufferUtils.copyFromArrayToBuffer(this.buf, this.getTimestampOffset(), ts, tsOffset,
// TODO when doing HBASE-15179 Bytes.SIZEOF_LONG);
throw new UnsupportedOperationException();
} }
@Override @Override

View File

@ -20,14 +20,14 @@ package org.apache.hadoop.hbase.codec;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.nio.ByteBuffer;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.ByteBufferInputStream; import org.apache.hadoop.hbase.io.ByteBuffInputStream;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
/** /**
@ -118,8 +118,8 @@ public class CellCodec implements Codec {
} }
@Override @Override
public Decoder getDecoder(ByteBuffer buf) { public Decoder getDecoder(ByteBuff buf) {
return getDecoder(new ByteBufferInputStream(buf)); return getDecoder(new ByteBuffInputStream(buf));
} }
@Override @Override

View File

@ -20,14 +20,14 @@ package org.apache.hadoop.hbase.codec;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.nio.ByteBuffer;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.ByteBufferInputStream; import org.apache.hadoop.hbase.io.ByteBuffInputStream;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
/** /**
@ -119,8 +119,8 @@ public class CellCodecWithTags implements Codec {
} }
@Override @Override
public Decoder getDecoder(ByteBuffer buf) { public Decoder getDecoder(ByteBuff buf) {
return getDecoder(new ByteBufferInputStream(buf)); return getDecoder(new ByteBuffInputStream(buf));
} }
@Override @Override

View File

@ -19,12 +19,12 @@ package org.apache.hadoop.hbase.codec;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.CellOutputStream; import org.apache.hadoop.hbase.io.CellOutputStream;
import org.apache.hadoop.hbase.nio.ByteBuff;
/** /**
* Encoder/Decoder for Cell. * Encoder/Decoder for Cell.
@ -51,6 +51,6 @@ public interface Codec {
interface Decoder extends CellScanner {}; interface Decoder extends CellScanner {};
Decoder getDecoder(InputStream is); Decoder getDecoder(InputStream is);
Decoder getDecoder(ByteBuffer buf); Decoder getDecoder(ByteBuff buf);
Encoder getEncoder(OutputStream os); Encoder getEncoder(OutputStream os);
} }

View File

@ -27,8 +27,10 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.NoTagsKeyValue; import org.apache.hadoop.hbase.NoTagsKeyValue;
import org.apache.hadoop.hbase.OffheapKeyValue;
import org.apache.hadoop.hbase.ShareableMemory; import org.apache.hadoop.hbase.ShareableMemory;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -76,24 +78,28 @@ public class KeyValueCodec implements Codec {
} }
} }
public static class ByteBufferedKeyValueDecoder implements Codec.Decoder { public static class ByteBuffKeyValueDecoder implements Codec.Decoder {
protected final ByteBuffer buf; protected final ByteBuff buf;
protected Cell current = null; protected Cell current = null;
public ByteBufferedKeyValueDecoder(ByteBuffer buf) { public ByteBuffKeyValueDecoder(ByteBuff buf) {
this.buf = buf; this.buf = buf;
} }
@Override @Override
public boolean advance() throws IOException { public boolean advance() throws IOException {
if (this.buf.remaining() <= 0) { if (!this.buf.hasRemaining()) {
return false; return false;
} }
int len = ByteBufferUtils.toInt(buf); int len = buf.getInt();
assert buf.hasArray(); ByteBuffer bb = buf.asSubByteBuffer(len);
this.current = createCell(buf.array(), buf.arrayOffset() + buf.position(), len); if (bb.isDirect()) {
buf.position(buf.position() + len); this.current = createCell(bb, bb.position(), len);
} else {
this.current = createCell(bb.array(), bb.arrayOffset() + bb.position(), len);
}
buf.skip(len);
return true; return true;
} }
@ -106,6 +112,11 @@ public class KeyValueCodec implements Codec {
return new ShareableMemoryNoTagsKeyValue(buf, offset, len); return new ShareableMemoryNoTagsKeyValue(buf, offset, len);
} }
protected Cell createCell(ByteBuffer bb, int pos, int len) {
// We know there is not going to be any tags.
return new ShareableMemoryOffheapKeyValue(bb, pos, len, false, 0);
}
static class ShareableMemoryKeyValue extends KeyValue implements ShareableMemory { static class ShareableMemoryKeyValue extends KeyValue implements ShareableMemory {
public ShareableMemoryKeyValue(byte[] bytes, int offset, int length) { public ShareableMemoryKeyValue(byte[] bytes, int offset, int length) {
super(bytes, offset, length); super(bytes, offset, length);
@ -133,6 +144,31 @@ public class KeyValueCodec implements Codec {
return kv; return kv;
} }
} }
static class ShareableMemoryOffheapKeyValue extends OffheapKeyValue implements ShareableMemory {
public ShareableMemoryOffheapKeyValue(ByteBuffer buf, int offset, int length) {
super(buf, offset, length);
}
public ShareableMemoryOffheapKeyValue(ByteBuffer buf, int offset, int length, boolean hasTags,
long seqId) {
super(buf, offset, length, hasTags, seqId);
}
@Override
public Cell cloneToCell() {
byte[] copy = new byte[this.length];
ByteBufferUtils.copyFromBufferToArray(copy, this.buf, this.offset, 0, this.length);
KeyValue kv;
if (this.hasTags) {
kv = new KeyValue(copy, 0, copy.length);
} else {
kv = new NoTagsKeyValue(copy, 0, copy.length);
}
kv.setSequenceId(this.getSequenceId());
return kv;
}
}
} }
/** /**
@ -144,8 +180,8 @@ public class KeyValueCodec implements Codec {
} }
@Override @Override
public Decoder getDecoder(ByteBuffer buf) { public Decoder getDecoder(ByteBuff buf) {
return new ByteBufferedKeyValueDecoder(buf); return new ByteBuffKeyValueDecoder(buf);
} }
@Override @Override

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.ByteBufferUtils;
/** /**
@ -78,16 +79,21 @@ public class KeyValueCodecWithTags implements Codec {
} }
} }
public static class ByteBufferedKeyValueDecoder public static class ByteBuffKeyValueDecoder extends KeyValueCodec.ByteBuffKeyValueDecoder {
extends KeyValueCodec.ByteBufferedKeyValueDecoder {
public ByteBufferedKeyValueDecoder(ByteBuffer buf) { public ByteBuffKeyValueDecoder(ByteBuff buf) {
super(buf); super(buf);
} }
@Override
protected Cell createCell(byte[] buf, int offset, int len) { protected Cell createCell(byte[] buf, int offset, int len) {
return new ShareableMemoryKeyValue(buf, offset, len); return new ShareableMemoryKeyValue(buf, offset, len);
} }
@Override
protected Cell createCell(ByteBuffer bb, int pos, int len) {
return new ShareableMemoryOffheapKeyValue(bb, pos, len);
}
} }
/** /**
@ -104,7 +110,7 @@ public class KeyValueCodecWithTags implements Codec {
} }
@Override @Override
public Decoder getDecoder(ByteBuffer buf) { public Decoder getDecoder(ByteBuff buf) {
return new ByteBufferedKeyValueDecoder(buf); return new ByteBuffKeyValueDecoder(buf);
} }
} }

View File

@ -31,7 +31,7 @@ import org.apache.hadoop.hbase.util.Bytes;
* supports writing ByteBuffer directly to it. * supports writing ByteBuffer directly to it.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class ByteArrayOutputStream extends OutputStream implements ByteBufferSupportOutputStream { public class ByteArrayOutputStream extends OutputStream implements ByteBufferWriter {
// Borrowed from openJDK: // Borrowed from openJDK:
// http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/8-b132/java/util/ArrayList.java#221 // http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/8-b132/java/util/ArrayList.java#221

View File

@ -38,7 +38,7 @@ import org.apache.hadoop.hbase.util.Bytes;
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class ByteBufferOutputStream extends OutputStream public class ByteBufferOutputStream extends OutputStream
implements ByteBufferSupportOutputStream { implements ByteBufferWriter {
// Borrowed from openJDK: // Borrowed from openJDK:
// http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/8-b132/java/util/ArrayList.java#221 // http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/8-b132/java/util/ArrayList.java#221

View File

@ -140,7 +140,7 @@ public class ByteBufferPool {
buffers.offer(buf); buffers.offer(buf);
} }
int getBufferSize() { public int getBufferSize() {
return this.bufferSize; return this.bufferSize;
} }
@ -148,7 +148,7 @@ public class ByteBufferPool {
* @return Number of free buffers * @return Number of free buffers
*/ */
@VisibleForTesting @VisibleForTesting
int getQueueSize() { public int getQueueSize() {
return buffers.size(); return buffers.size();
} }
} }

View File

@ -23,29 +23,31 @@ import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
/** /**
* Interface adds support for writing {@link ByteBuffer} into OutputStream. * This interface marks a class to support writing ByteBuffers into it.
* @see ByteArrayOutputStream
* @see ByteBufferOutputStream
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public interface ByteBufferSupportOutputStream { public interface ByteBufferWriter {
/** /**
* Writes <code>len</code> bytes from the specified ByteBuffer starting at offset <code>off</code> * Writes <code>len</code> bytes from the specified ByteBuffer starting at offset <code>off</code>
* to this output stream.
* *
* @param b the data. * @param b the data.
* @param off the start offset in the data. * @param off the start offset in the data.
* @param len the number of bytes to write. * @param len the number of bytes to write.
* @exception IOException * @exception IOException if an I/O error occurs.
* if an I/O error occurs. In particular, an <code>IOException</code> is thrown if
* the output stream is closed.
*/ */
void write(ByteBuffer b, int off, int len) throws IOException; void write(ByteBuffer b, int off, int len) throws IOException;
/** /**
* Writes an <code>int</code> to the underlying output stream as four * Writes an <code>int</code> to the underlying output stream as four bytes, high byte first.
* bytes, high byte first.
* @param i the <code>int</code> to write * @param i the <code>int</code> to write
* @throws IOException if an I/O error occurs. * @throws IOException if an I/O error occurs.
*/ */
// This is pure performance oriented API been added here. It has nothing to do with
// ByteBuffer and so not fully belong to here. This allows an int to be written at one go instead
// of 4 (4 bytes one by one).
// TODO remove it from here?
void writeInt(int i) throws IOException; void writeInt(int i) throws IOException;
} }

View File

@ -26,13 +26,13 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.ByteBufferUtils;
/** /**
* Our extension of DataOutputStream which implements ByteBufferSupportOutputStream * Our extension of DataOutputStream which implements ByteBufferWriter
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class ByteBufferSupportDataOutputStream extends DataOutputStream public class ByteBufferWriterDataOutputStream extends DataOutputStream
implements ByteBufferSupportOutputStream { implements ByteBufferWriter {
public ByteBufferSupportDataOutputStream(OutputStream out) { public ByteBufferWriterDataOutputStream(OutputStream out) {
super(out); super(out);
} }

View File

@ -0,0 +1,90 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.util.StreamUtils;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
/**
* When deal with OutputStream which is not ByteBufferWriter type, wrap it with this class. We will
* have to write offheap ByteBuffer (DBB) data into the OS. This class is having a temp byte array
* to which we can copy the DBB data for writing to the OS.
* <br>
* This is used while writing Cell data to WAL. In case of AsyncWAL, the OS created there is
* ByteBufferWriter. But in case of FSHLog, the OS passed by DFS client, is not of type
* ByteBufferWriter. We will need this temp solution until DFS client supports writing ByteBuffer
* directly to the OS it creates.
* <br>
* Note: This class is not thread safe.
*/
@InterfaceAudience.Private
public class ByteBufferWriterOutputStream extends OutputStream
implements ByteBufferWriter {
private static final int TEMP_BUF_LENGTH = 4 * 1024;
private final OutputStream os;
private byte[] tempBuf = null;
public ByteBufferWriterOutputStream(OutputStream os) {
this.os = os;
}
@Override
public void write(ByteBuffer b, int off, int len) throws IOException {
byte[] buf = null;
if (len > TEMP_BUF_LENGTH) {
buf = new byte[len];
} else {
if (this.tempBuf == null) {
this.tempBuf = new byte[TEMP_BUF_LENGTH];
}
buf = this.tempBuf;
}
ByteBufferUtils.copyFromBufferToArray(buf, b, off, 0, len);
this.os.write(buf, 0, len);
}
@Override
public void writeInt(int i) throws IOException {
StreamUtils.writeInt(this.os, i);
}
@Override
public void write(int b) throws IOException {
this.os.write(b);
}
public void write(byte b[], int off, int len) throws IOException {
this.os.write(b, off, len);
}
@Override
public void flush() throws IOException {
this.os.flush();
}
@Override
public void close() throws IOException {
this.os.close();
}
}

View File

@ -17,7 +17,9 @@
*/ */
package org.apache.hadoop.hbase.nio; package org.apache.hadoop.hbase.nio;
import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.ByteBufferUtils;
@ -34,7 +36,10 @@ import org.apache.hadoop.io.WritableUtils;
* helps us in the read path. * helps us in the read path.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
// TODO to have another name. This can easily get confused with netty's ByteBuf
public abstract class ByteBuff { public abstract class ByteBuff {
private static final int NIO_BUFFER_LIMIT = 64 * 1024; // should not be more than 64KB.
/** /**
* @return this ByteBuff's current position * @return this ByteBuff's current position
*/ */
@ -355,6 +360,14 @@ public abstract class ByteBuff {
*/ */
public abstract long getLongAfterPosition(int offset); public abstract long getLongAfterPosition(int offset);
/**
* Copy the content from this ByteBuff to a byte[].
* @return byte[] with the copied contents from this ByteBuff.
*/
public byte[] toBytes() {
return toBytes(0, this.limit());
}
/** /**
* Copy the content from this ByteBuff to a byte[] based on the given offset and * Copy the content from this ByteBuff to a byte[] based on the given offset and
* length * length
@ -389,7 +402,39 @@ public abstract class ByteBuff {
*/ */
public abstract ByteBuff put(int offset, ByteBuff src, int srcOffset, int length); public abstract ByteBuff put(int offset, ByteBuff src, int srcOffset, int length);
/**
* Reads bytes from the given channel into this ByteBuff
* @param channel
* @return The number of bytes read from the channel
* @throws IOException
*/
public abstract int read(ReadableByteChannel channel) throws IOException;
// static helper methods // static helper methods
public static int channelRead(ReadableByteChannel channel, ByteBuffer buf) throws IOException {
if (buf.remaining() <= NIO_BUFFER_LIMIT) {
return channel.read(buf);
}
int originalLimit = buf.limit();
int initialRemaining = buf.remaining();
int ret = 0;
while (buf.remaining() > 0) {
try {
int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
buf.limit(buf.position() + ioSize);
ret = channel.read(buf);
if (ret < ioSize) {
break;
}
} finally {
buf.limit(originalLimit);
}
}
int nBytes = initialRemaining - buf.remaining();
return (nBytes > 0) ? nBytes : ret;
}
/** /**
* Read integer from ByteBuff coded in 7 bits and increment position. * Read integer from ByteBuff coded in 7 bits and increment position.
* @return Read integer. * @return Read integer.

View File

@ -17,16 +17,20 @@
*/ */
package org.apache.hadoop.hbase.nio; package org.apache.hadoop.hbase.nio;
import java.io.IOException;
import java.nio.BufferOverflowException; import java.nio.BufferOverflowException;
import java.nio.BufferUnderflowException; import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.InvalidMarkException; import java.nio.InvalidMarkException;
import java.nio.channels.ReadableByteChannel;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ObjectIntPair; import org.apache.hadoop.hbase.util.ObjectIntPair;
import com.google.common.annotations.VisibleForTesting;
/** /**
* Provides a unified view of all the underlying ByteBuffers and will look as if a bigger * Provides a unified view of all the underlying ByteBuffers and will look as if a bigger
* sequential buffer. This class provides similar APIs as in {@link ByteBuffer} to put/get int, * sequential buffer. This class provides similar APIs as in {@link ByteBuffer} to put/get int,
@ -1070,6 +1074,28 @@ public class MultiByteBuff extends ByteBuff {
return output; return output;
} }
@Override
public int read(ReadableByteChannel channel) throws IOException {
int total = 0;
while (true) {
// Read max possible into the current BB
int len = channelRead(channel, this.curItem);
if (len > 0)
total += len;
if (this.curItem.hasRemaining()) {
// We were not able to read enough to fill the current BB itself. Means there is no point in
// doing more reads from Channel. Only this much there for now.
break;
} else {
if (this.curItemIndex >= this.limitedItemIndex)
break;
this.curItemIndex++;
this.curItem = this.items[this.curItemIndex];
}
}
return total;
}
@Override @Override
public boolean equals(Object obj) { public boolean equals(Object obj) {
if (!(obj instanceof MultiByteBuff)) return false; if (!(obj instanceof MultiByteBuff)) return false;
@ -1091,4 +1117,12 @@ public class MultiByteBuff extends ByteBuff {
} }
return hash; return hash;
} }
/**
* @return the ByteBuffers which this wraps.
*/
@VisibleForTesting
public ByteBuffer[] getEnclosingByteBuffers() {
return this.items;
}
} }

View File

@ -17,7 +17,9 @@
*/ */
package org.apache.hadoop.hbase.nio; package org.apache.hadoop.hbase.nio;
import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.ByteBufferUtils;
@ -25,6 +27,8 @@ import org.apache.hadoop.hbase.util.ObjectIntPair;
import org.apache.hadoop.hbase.util.UnsafeAccess; import org.apache.hadoop.hbase.util.UnsafeAccess;
import org.apache.hadoop.hbase.util.UnsafeAvailChecker; import org.apache.hadoop.hbase.util.UnsafeAvailChecker;
import com.google.common.annotations.VisibleForTesting;
import sun.nio.ch.DirectBuffer; import sun.nio.ch.DirectBuffer;
/** /**
@ -312,6 +316,11 @@ public class SingleByteBuff extends ByteBuff {
ByteBufferUtils.copyFromBufferToBuffer(buf, out, sourceOffset, length); ByteBufferUtils.copyFromBufferToBuffer(buf, out, sourceOffset, length);
} }
@Override
public int read(ReadableByteChannel channel) throws IOException {
return channelRead(channel, buf);
}
@Override @Override
public boolean equals(Object obj) { public boolean equals(Object obj) {
if(!(obj instanceof SingleByteBuff)) return false; if(!(obj instanceof SingleByteBuff)) return false;
@ -326,7 +335,8 @@ public class SingleByteBuff extends ByteBuff {
/** /**
* @return the ByteBuffer which this wraps. * @return the ByteBuffer which this wraps.
*/ */
ByteBuffer getEnclosingByteBuffer() { @VisibleForTesting
public ByteBuffer getEnclosingByteBuffer() {
return this.buf; return this.buf;
} }
} }

View File

@ -29,7 +29,7 @@ import java.util.Arrays;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.io.ByteBufferSupportOutputStream; import org.apache.hadoop.hbase.io.ByteBufferWriter;
import org.apache.hadoop.hbase.io.util.StreamUtils; import org.apache.hadoop.hbase.io.util.StreamUtils;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.WritableUtils;
@ -144,8 +144,8 @@ public final class ByteBufferUtils {
// We have writeInt in ByteBufferOutputStream so that it can directly write // We have writeInt in ByteBufferOutputStream so that it can directly write
// int to underlying // int to underlying
// ByteBuffer in one step. // ByteBuffer in one step.
if (out instanceof ByteBufferSupportOutputStream) { if (out instanceof ByteBufferWriter) {
((ByteBufferSupportOutputStream) out).writeInt(value); ((ByteBufferWriter) out).writeInt(value);
} else { } else {
StreamUtils.writeInt(out, value); StreamUtils.writeInt(out, value);
} }
@ -182,8 +182,8 @@ public final class ByteBufferUtils {
*/ */
public static void copyBufferToStream(OutputStream out, ByteBuffer in, public static void copyBufferToStream(OutputStream out, ByteBuffer in,
int offset, int length) throws IOException { int offset, int length) throws IOException {
if (out instanceof ByteBufferSupportOutputStream) { if (out instanceof ByteBufferWriter) {
((ByteBufferSupportOutputStream) out).write(in, offset, length); ((ByteBufferWriter) out).write(in, offset, length);
} else if (in.hasArray()) { } else if (in.hasArray()) {
out.write(in.array(), in.arrayOffset() + offset, length); out.write(in.array(), in.arrayOffset() + offset, length);
} else { } else {

View File

@ -78,7 +78,7 @@ public class TestTagCompressionContext {
@Test @Test
public void testCompressUncompressTagsWithOffheapKeyValue1() throws Exception { public void testCompressUncompressTagsWithOffheapKeyValue1() throws Exception {
ByteArrayOutputStream baos = new ByteArrayOutputStream(); ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream daos = new ByteBufferSupportDataOutputStream(baos); DataOutputStream daos = new ByteBufferWriterDataOutputStream(baos);
TagCompressionContext context = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE); TagCompressionContext context = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
ByteBufferedCell kv1 = (ByteBufferedCell)createOffheapKVWithTags(2); ByteBufferedCell kv1 = (ByteBufferedCell)createOffheapKVWithTags(2);
int tagsLength1 = kv1.getTagsLength(); int tagsLength1 = kv1.getTagsLength();
@ -127,7 +127,7 @@ public class TestTagCompressionContext {
@Test @Test
public void testCompressUncompressTagsWithOffheapKeyValue2() throws Exception { public void testCompressUncompressTagsWithOffheapKeyValue2() throws Exception {
ByteArrayOutputStream baos = new ByteArrayOutputStream(); ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream daos = new ByteBufferSupportDataOutputStream(baos); DataOutputStream daos = new ByteBufferWriterDataOutputStream(baos);
TagCompressionContext context = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE); TagCompressionContext context = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
ByteBufferedCell kv1 = (ByteBufferedCell)createOffheapKVWithTags(1); ByteBufferedCell kv1 = (ByteBufferedCell)createOffheapKVWithTags(1);
int tagsLength1 = kv1.getTagsLength(); int tagsLength1 = kv1.getTagsLength();

View File

@ -20,10 +20,10 @@ package org.apache.hadoop.hbase.codec;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.ByteBufferInputStream; import org.apache.hadoop.hbase.io.ByteBuffInputStream;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience;
@ -83,8 +83,8 @@ public class MessageCodec implements Codec {
} }
@Override @Override
public Decoder getDecoder(ByteBuffer buf) { public Decoder getDecoder(ByteBuff buf) {
return getDecoder(new ByteBufferInputStream(buf)); return getDecoder(new ByteBuffInputStream(buf));
} }
@Override @Override

View File

@ -37,7 +37,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.ByteArrayOutputStream; import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
import org.apache.hadoop.hbase.io.ByteBuffInputStream; import org.apache.hadoop.hbase.io.ByteBuffInputStream;
import org.apache.hadoop.hbase.io.ByteBufferSupportDataOutputStream; import org.apache.hadoop.hbase.io.ByteBufferWriterDataOutputStream;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
@ -962,7 +962,7 @@ public class HFileBlock implements Cacheable {
state = State.WRITING; state = State.WRITING;
// We will compress it later in finishBlock() // We will compress it later in finishBlock()
userDataStream = new ByteBufferSupportDataOutputStream(baosInMemory); userDataStream = new ByteBufferWriterDataOutputStream(baosInMemory);
if (newBlockType == BlockType.DATA) { if (newBlockType == BlockType.DATA) {
this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream); this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream);
} }

View File

@ -19,8 +19,6 @@ package org.apache.hadoop.hbase.ipc;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.CallDroppedException; import org.apache.hadoop.hbase.CallDroppedException;
import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience;
@ -45,7 +43,6 @@ import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class CallRunner { public class CallRunner {
private static final Log LOG = LogFactory.getLog(CallRunner.class);
private static final CallDroppedException CALL_DROPPED_EXCEPTION private static final CallDroppedException CALL_DROPPED_EXCEPTION
= new CallDroppedException(); = new CallDroppedException();
@ -143,6 +140,8 @@ public class CallRunner {
sucessful = true; sucessful = true;
} }
} }
// return back the RPC request read BB we can do here. It is done by now.
call.cleanup();
// Set the response // Set the response
Message param = resultPair != null ? resultPair.getFirst() : null; Message param = resultPair != null ? resultPair.getFirst() : null;
CellScanner cells = resultPair != null ? resultPair.getSecond() : null; CellScanner cells = resultPair != null ? resultPair.getSecond() : null;

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.ipc;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
@ -99,6 +100,9 @@ import org.apache.hadoop.hbase.io.ByteBufferPool;
import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES; import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.MultiByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.regionserver.RSRpcServices; import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
@ -146,6 +150,7 @@ import org.codehaus.jackson.map.ObjectMapper;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString; import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService; import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteInput;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedInputStream; import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedInputStream;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream; import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor; import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
@ -304,6 +309,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
private UserProvider userProvider; private UserProvider userProvider;
private final ByteBufferPool reservoir; private final ByteBufferPool reservoir;
// The requests and response will use buffers from ByteBufferPool, when the size of the
// request/response is at least this size.
// We make this to be 1/6th of the pool buffer size.
private final int minSizeForReservoirUse;
private volatile boolean allowFallbackToSimpleAuth; private volatile boolean allowFallbackToSimpleAuth;
@ -344,10 +353,11 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
protected boolean isError; protected boolean isError;
protected TraceInfo tinfo; protected TraceInfo tinfo;
private ByteBufferListOutputStream cellBlockStream = null; private ByteBufferListOutputStream cellBlockStream = null;
private CallCleanup reqCleanup = null;
private User user; private User user;
private InetAddress remoteAddress; private InetAddress remoteAddress;
private RpcCallback callback; private RpcCallback rpcCallback;
private long responseCellSize = 0; private long responseCellSize = 0;
private long responseBlockSize = 0; private long responseBlockSize = 0;
@ -357,7 +367,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
justification="Can't figure why this complaint is happening... see below") justification="Can't figure why this complaint is happening... see below")
Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header, Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header,
Message param, CellScanner cellScanner, Connection connection, Responder responder, Message param, CellScanner cellScanner, Connection connection, Responder responder,
long size, TraceInfo tinfo, final InetAddress remoteAddress, int timeout) { long size, TraceInfo tinfo, final InetAddress remoteAddress, int timeout,
CallCleanup reqCleanup) {
this.id = id; this.id = id;
this.service = service; this.service = service;
this.md = md; this.md = md;
@ -377,6 +388,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
connection == null? null: connection.retryImmediatelySupported; connection == null? null: connection.retryImmediatelySupported;
this.timeout = timeout; this.timeout = timeout;
this.deadline = this.timeout > 0 ? this.timestamp + this.timeout : Long.MAX_VALUE; this.deadline = this.timeout > 0 ? this.timestamp + this.timeout : Long.MAX_VALUE;
this.reqCleanup = reqCleanup;
} }
/** /**
@ -391,9 +403,18 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
// got from pool. // got from pool.
this.cellBlockStream = null; this.cellBlockStream = null;
} }
cleanup();// If the call was run successfuly, we might have already returned the
// BB back to pool. No worries..Then inputCellBlock will be null
this.connection.decRpcCount(); // Say that we're done with this call. this.connection.decRpcCount(); // Say that we're done with this call.
} }
protected void cleanup() {
if (this.reqCleanup != null) {
this.reqCleanup.run();
this.reqCleanup = null;
}
}
@Override @Override
public String toString() { public String toString() {
return toShortString() + " param: " + return toShortString() + " param: " +
@ -515,9 +536,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
this.response = bc; this.response = bc;
// Once a response message is created and set to this.response, this Call can be treated as // Once a response message is created and set to this.response, this Call can be treated as
// done. The Responder thread will do the n/w write of this message back to client. // done. The Responder thread will do the n/w write of this message back to client.
if (this.callback != null) { if (this.rpcCallback != null) {
try { try {
this.callback.run(); this.rpcCallback.run();
} catch (Exception e) { } catch (Exception e) {
// Don't allow any exception here to kill this handler thread. // Don't allow any exception here to kill this handler thread.
LOG.warn("Exception while running the Rpc Callback.", e); LOG.warn("Exception while running the Rpc Callback.", e);
@ -722,7 +743,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
@Override @Override
public synchronized void setCallBack(RpcCallback callback) { public synchronized void setCallBack(RpcCallback callback) {
this.callback = callback; this.rpcCallback = callback;
} }
@Override @Override
@ -731,6 +752,11 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
} }
} }
@FunctionalInterface
static interface CallCleanup {
void run();
}
/** Listens on the socket. Creates jobs for the handler threads*/ /** Listens on the socket. Creates jobs for the handler threads*/
private class Listener extends Thread { private class Listener extends Thread {
@ -1289,7 +1315,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
// If the connection header has been read or not. // If the connection header has been read or not.
private boolean connectionHeaderRead = false; private boolean connectionHeaderRead = false;
protected SocketChannel channel; protected SocketChannel channel;
private ByteBuffer data; private ByteBuff data;
private CallCleanup callCleanup;
private ByteBuffer dataLengthBuffer; private ByteBuffer dataLengthBuffer;
protected final ConcurrentLinkedDeque<Call> responseQueue = new ConcurrentLinkedDeque<Call>(); protected final ConcurrentLinkedDeque<Call> responseQueue = new ConcurrentLinkedDeque<Call>();
private final Lock responseWriteLock = new ReentrantLock(); private final Lock responseWriteLock = new ReentrantLock();
@ -1327,17 +1354,17 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
// Fake 'call' for failed authorization response // Fake 'call' for failed authorization response
private static final int AUTHORIZATION_FAILED_CALLID = -1; private static final int AUTHORIZATION_FAILED_CALLID = -1;
private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID, null, null, null, private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID, null, null, null,
null, null, this, null, 0, null, null, 0); null, null, this, null, 0, null, null, 0, null);
private ByteArrayOutputStream authFailedResponse = private ByteArrayOutputStream authFailedResponse =
new ByteArrayOutputStream(); new ByteArrayOutputStream();
// Fake 'call' for SASL context setup // Fake 'call' for SASL context setup
private static final int SASL_CALLID = -33; private static final int SASL_CALLID = -33;
private final Call saslCall = new Call(SASL_CALLID, null, null, null, null, null, this, null, private final Call saslCall = new Call(SASL_CALLID, null, null, null, null, null, this, null,
0, null, null, 0); 0, null, null, 0, null);
// Fake 'call' for connection header response // Fake 'call' for connection header response
private static final int CONNECTION_HEADER_RESPONSE_CALLID = -34; private static final int CONNECTION_HEADER_RESPONSE_CALLID = -34;
private final Call setConnectionHeaderResponseCall = new Call(CONNECTION_HEADER_RESPONSE_CALLID, private final Call setConnectionHeaderResponseCall = new Call(CONNECTION_HEADER_RESPONSE_CALLID,
null, null, null, null, null, this, null, 0, null, null, 0); null, null, null, null, null, this, null, 0, null, null, 0, null);
// was authentication allowed with a fallback to simple auth // was authentication allowed with a fallback to simple auth
private boolean authenticatedWithFallback; private boolean authenticatedWithFallback;
@ -1352,6 +1379,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
this.channel = channel; this.channel = channel;
this.lastContact = lastContact; this.lastContact = lastContact;
this.data = null; this.data = null;
this.callCleanup = null;
this.dataLengthBuffer = ByteBuffer.allocate(4); this.dataLengthBuffer = ByteBuffer.allocate(4);
this.socket = channel.socket(); this.socket = channel.socket();
this.addr = socket.getInetAddress(); this.addr = socket.getInetAddress();
@ -1437,7 +1465,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
return authorizedUgi; return authorizedUgi;
} }
private void saslReadAndProcess(ByteBuffer saslToken) throws IOException, private void saslReadAndProcess(ByteBuff saslToken) throws IOException,
InterruptedException { InterruptedException {
if (saslContextEstablished) { if (saslContextEstablished) {
if (LOG.isTraceEnabled()) if (LOG.isTraceEnabled())
@ -1447,13 +1475,13 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
if (!useWrap) { if (!useWrap) {
processOneRpc(saslToken); processOneRpc(saslToken);
} else { } else {
byte[] b = saslToken.array(); byte[] b = saslToken.hasArray() ? saslToken.array() : saslToken.toBytes();
byte [] plaintextData; byte [] plaintextData;
if (useCryptoAesWrap) { if (useCryptoAesWrap) {
// unwrap with CryptoAES // unwrap with CryptoAES
plaintextData = cryptoAES.unwrap(b, saslToken.position(), saslToken.limit()); plaintextData = cryptoAES.unwrap(b, 0, b.length);
} else { } else {
plaintextData = saslServer.unwrap(b, saslToken.position(), saslToken.limit()); plaintextData = saslServer.unwrap(b, 0, b.length);
} }
processUnwrappedData(plaintextData); processUnwrappedData(plaintextData);
} }
@ -1506,7 +1534,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
LOG.debug("Have read input token of size " + saslToken.limit() LOG.debug("Have read input token of size " + saslToken.limit()
+ " for processing by saslServer.evaluateResponse()"); + " for processing by saslServer.evaluateResponse()");
} }
replyToken = saslServer.evaluateResponse(saslToken.array()); replyToken = saslServer
.evaluateResponse(saslToken.hasArray() ? saslToken.array() : saslToken.toBytes());
} catch (IOException e) { } catch (IOException e) {
IOException sendToClient = e; IOException sendToClient = e;
Throwable cause = e; Throwable cause = e;
@ -1759,7 +1788,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
// Notify the client about the offending request // Notify the client about the offending request
Call reqTooBig = new Call(header.getCallId(), this.service, null, null, null, Call reqTooBig = new Call(header.getCallId(), this.service, null, null, null,
null, this, responder, 0, null, this.addr,0); null, this, responder, 0, null, this.addr, 0, null);
metrics.exception(REQUEST_TOO_BIG_EXCEPTION); metrics.exception(REQUEST_TOO_BIG_EXCEPTION);
// Make sure the client recognizes the underlying exception // Make sure the client recognizes the underlying exception
// Otherwise, throw a DoNotRetryIOException. // Otherwise, throw a DoNotRetryIOException.
@ -1779,7 +1808,11 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
return -1; return -1;
} }
data = ByteBuffer.allocate(dataLength); // Initialize this.data with a ByteBuff.
// This call will allocate a ByteBuff to read request into and assign to this.data
// Also when we use some buffer(s) from pool, it will create a CallCleanup instance also and
// assign to this.callCleanup
initByteBuffToReadInto(dataLength);
// Increment the rpc count. This counter will be decreased when we write // Increment the rpc count. This counter will be decreased when we write
// the response. If we want the connection to be detected as idle properly, we // the response. If we want the connection to be detected as idle properly, we
@ -1787,7 +1820,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
incRpcCount(); incRpcCount();
} }
count = channelRead(channel, data); count = channelDataRead(channel, data);
if (count >= 0 && data.remaining() == 0) { // count==0 if dataLength == 0 if (count >= 0 && data.remaining() == 0) { // count==0 if dataLength == 0
process(); process();
@ -1796,11 +1829,41 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
return count; return count;
} }
// It creates the ByteBuff and CallCleanup and assign to Connection instance.
private void initByteBuffToReadInto(int length) {
// We create random on heap buffers are read into those when
// 1. ByteBufferPool is not there.
// 2. When the size of the req is very small. Using a large sized (64 KB) buffer from pool is
// waste then. Also if all the reqs are of this size, we will be creating larger sized
// buffers and pool them permanently. This include Scan/Get request and DDL kind of reqs like
// RegionOpen.
// 3. If it is an initial handshake signal or initial connection request. Any way then
// condition 2 itself will match
// 4. When SASL use is ON.
if (reservoir == null || skipInitialSaslHandshake || !connectionHeaderRead || useSasl
|| length < minSizeForReservoirUse) {
this.data = new SingleByteBuff(ByteBuffer.allocate(length));
} else {
Pair<ByteBuff, CallCleanup> pair = RpcServer.allocateByteBuffToReadInto(reservoir,
minSizeForReservoirUse, length);
this.data = pair.getFirst();
this.callCleanup = pair.getSecond();
}
}
protected int channelDataRead(ReadableByteChannel channel, ByteBuff buf) throws IOException {
int count = buf.read(channel);
if (count > 0) {
metrics.receivedBytes(count);
}
return count;
}
/** /**
* Process the data buffer and clean the connection state for the next call. * Process the data buffer and clean the connection state for the next call.
*/ */
private void process() throws IOException, InterruptedException { private void process() throws IOException, InterruptedException {
data.flip(); data.rewind();
try { try {
if (skipInitialSaslHandshake) { if (skipInitialSaslHandshake) {
skipInitialSaslHandshake = false; skipInitialSaslHandshake = false;
@ -1816,6 +1879,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
} finally { } finally {
dataLengthBuffer.clear(); // Clean for the next call dataLengthBuffer.clear(); // Clean for the next call
data = null; // For the GC data = null; // For the GC
this.callCleanup = null;
} }
} }
@ -1831,7 +1895,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
private int doBadPreambleHandling(final String msg, final Exception e) throws IOException { private int doBadPreambleHandling(final String msg, final Exception e) throws IOException {
LOG.warn(msg); LOG.warn(msg);
Call fakeCall = new Call(-1, null, null, null, null, null, this, responder, -1, null, null,0); Call fakeCall = new Call(-1, null, null, null, null, null, this, responder, -1, null, null, 0,
null);
setupResponse(null, fakeCall, e, msg); setupResponse(null, fakeCall, e, msg);
responder.doRespond(fakeCall); responder.doRespond(fakeCall);
// Returning -1 closes out the connection. // Returning -1 closes out the connection.
@ -1839,9 +1904,15 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
} }
// Reads the connection header following version // Reads the connection header following version
private void processConnectionHeader(ByteBuffer buf) throws IOException { private void processConnectionHeader(ByteBuff buf) throws IOException {
this.connectionHeader = ConnectionHeader.parseFrom( if (buf.hasArray()) {
new ByteBufferInputStream(buf)); this.connectionHeader = ConnectionHeader.parseFrom(buf.array());
} else {
CodedInputStream cis = CodedInputStream
.newInstance(new ByteBuffByteInput(buf, 0, buf.limit()), true);
cis.enableAliasing(true);
this.connectionHeader = ConnectionHeader.parseFrom(cis);
}
String serviceName = connectionHeader.getServiceName(); String serviceName = connectionHeader.getServiceName();
if (serviceName == null) throw new EmptyServiceNameException(); if (serviceName == null) throw new EmptyServiceNameException();
this.service = getService(services, serviceName); this.service = getService(services, serviceName);
@ -2043,13 +2114,13 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
if (unwrappedData.remaining() == 0) { if (unwrappedData.remaining() == 0) {
unwrappedDataLengthBuffer.clear(); unwrappedDataLengthBuffer.clear();
unwrappedData.flip(); unwrappedData.flip();
processOneRpc(unwrappedData); processOneRpc(new SingleByteBuff(unwrappedData));
unwrappedData = null; unwrappedData = null;
} }
} }
} }
private void processOneRpc(ByteBuffer buf) throws IOException, InterruptedException { private void processOneRpc(ByteBuff buf) throws IOException, InterruptedException {
if (connectionHeaderRead) { if (connectionHeaderRead) {
processRequest(buf); processRequest(buf);
} else { } else {
@ -2071,12 +2142,18 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
* @throws IOException * @throws IOException
* @throws InterruptedException * @throws InterruptedException
*/ */
protected void processRequest(ByteBuffer buf) throws IOException, InterruptedException { protected void processRequest(ByteBuff buf) throws IOException, InterruptedException {
long totalRequestSize = buf.limit(); long totalRequestSize = buf.limit();
int offset = 0; int offset = 0;
// Here we read in the header. We avoid having pb // Here we read in the header. We avoid having pb
// do its default 4k allocation for CodedInputStream. We force it to use backing array. // do its default 4k allocation for CodedInputStream. We force it to use backing array.
CodedInputStream cis = CodedInputStream.newInstance(buf.array(), offset, buf.limit()); CodedInputStream cis;
if (buf.hasArray()) {
cis = CodedInputStream.newInstance(buf.array(), offset, buf.limit());
} else {
cis = CodedInputStream.newInstance(new ByteBuffByteInput(buf, 0, buf.limit()), true);
cis.enableAliasing(true);
}
int headerSize = cis.readRawVarint32(); int headerSize = cis.readRawVarint32();
offset = cis.getTotalBytesRead(); offset = cis.getTotalBytesRead();
Message.Builder builder = RequestHeader.newBuilder(); Message.Builder builder = RequestHeader.newBuilder();
@ -2093,7 +2170,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
if ((totalRequestSize + callQueueSizeInBytes.sum()) > maxQueueSizeInBytes) { if ((totalRequestSize + callQueueSizeInBytes.sum()) > maxQueueSizeInBytes) {
final Call callTooBig = final Call callTooBig =
new Call(id, this.service, null, null, null, null, this, new Call(id, this.service, null, null, null, null, this,
responder, totalRequestSize, null, null, 0); responder, totalRequestSize, null, null, 0, null);
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION); metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
setupResponse(responseBuffer, callTooBig, CALL_QUEUE_TOO_BIG_EXCEPTION, setupResponse(responseBuffer, callTooBig, CALL_QUEUE_TOO_BIG_EXCEPTION,
@ -2127,7 +2204,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
} }
if (header.hasCellBlockMeta()) { if (header.hasCellBlockMeta()) {
buf.position(offset); buf.position(offset);
cellScanner = cellBlockBuilder.createCellScannerReusingBuffers(this.codec, this.compressionCodec, buf); ByteBuff dup = buf.duplicate();
dup.limit(offset + header.getCellBlockMeta().getLength());
cellScanner = cellBlockBuilder.createCellScannerReusingBuffers(this.codec,
this.compressionCodec, dup);
} }
} catch (Throwable t) { } catch (Throwable t) {
InetSocketAddress address = getListenerAddress(); InetSocketAddress address = getListenerAddress();
@ -2148,7 +2228,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
final Call readParamsFailedCall = final Call readParamsFailedCall =
new Call(id, this.service, null, null, null, null, this, new Call(id, this.service, null, null, null, null, this,
responder, totalRequestSize, null, null, 0); responder, totalRequestSize, null, null, 0, null);
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
setupResponse(responseBuffer, readParamsFailedCall, t, setupResponse(responseBuffer, readParamsFailedCall, t,
msg + "; " + t.getMessage()); msg + "; " + t.getMessage());
@ -2164,7 +2244,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
timeout = Math.max(minClientRequestTimeout, header.getTimeout()); timeout = Math.max(minClientRequestTimeout, header.getTimeout());
} }
Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder, Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,
totalRequestSize, traceInfo, this.addr, timeout); totalRequestSize, traceInfo, this.addr, timeout, this.callCleanup);
if (!scheduler.dispatch(new CallRunner(RpcServer.this, call))) { if (!scheduler.dispatch(new CallRunner(RpcServer.this, call))) {
callQueueSizeInBytes.add(-1 * call.getSize()); callQueueSizeInBytes.add(-1 * call.getSize());
@ -2211,6 +2291,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
protected synchronized void close() { protected synchronized void close() {
disposeSasl(); disposeSasl();
data = null; data = null;
callCleanup = null;
if (!channel.isOpen()) if (!channel.isOpen())
return; return;
try {socket.shutdownOutput();} catch(Exception ignored) { try {socket.shutdownOutput();} catch(Exception ignored) {
@ -2301,8 +2382,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
conf.getInt(ByteBufferPool.MAX_POOL_SIZE_KEY, conf.getInt(ByteBufferPool.MAX_POOL_SIZE_KEY,
conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * 2)); HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * 2));
this.minSizeForReservoirUse = getMinSizeForReservoirUse(this.reservoir);
} else { } else {
reservoir = null; reservoir = null;
this.minSizeForReservoirUse = Integer.MAX_VALUE;// reservoir itself not in place.
} }
this.server = server; this.server = server;
this.services = services; this.services = services;
@ -2347,6 +2430,11 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
this.scheduler.init(new RpcSchedulerContext(this)); this.scheduler.init(new RpcSchedulerContext(this));
} }
@VisibleForTesting
static int getMinSizeForReservoirUse(ByteBufferPool pool) {
return pool.getBufferSize() / 6;
}
@Override @Override
public void onConfigurationChange(Configuration newConf) { public void onConfigurationChange(Configuration newConf) {
initReconfigurable(newConf); initReconfigurable(newConf);
@ -2754,6 +2842,55 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
return (nBytes > 0) ? nBytes : ret; return (nBytes > 0) ? nBytes : ret;
} }
/**
* This is extracted to a static method for better unit testing. We try to get buffer(s) from pool
* as much as possible.
*
* @param pool The ByteBufferPool to use
* @param minSizeForPoolUse Only for buffer size above this, we will try to use pool. Any buffer
* need of size below this, create on heap ByteBuffer.
* @param reqLen Bytes count in request
*/
@VisibleForTesting
static Pair<ByteBuff, CallCleanup> allocateByteBuffToReadInto(ByteBufferPool pool,
int minSizeForPoolUse, int reqLen) {
ByteBuff resultBuf;
List<ByteBuffer> bbs = new ArrayList<ByteBuffer>((reqLen / pool.getBufferSize()) + 1);
int remain = reqLen;
ByteBuffer buf = null;
while (remain >= minSizeForPoolUse && (buf = pool.getBuffer()) != null) {
bbs.add(buf);
remain -= pool.getBufferSize();
}
ByteBuffer[] bufsFromPool = null;
if (bbs.size() > 0) {
bufsFromPool = new ByteBuffer[bbs.size()];
bbs.toArray(bufsFromPool);
}
if (remain > 0) {
bbs.add(ByteBuffer.allocate(remain));
}
if (bbs.size() > 1) {
ByteBuffer[] items = new ByteBuffer[bbs.size()];
bbs.toArray(items);
resultBuf = new MultiByteBuff(items);
} else {
// We are backed by single BB
resultBuf = new SingleByteBuff(bbs.get(0));
}
resultBuf.limit(reqLen);
if (bufsFromPool != null) {
final ByteBuffer[] bufsFromPoolFinal = bufsFromPool;
return new Pair<ByteBuff, RpcServer.CallCleanup>(resultBuf, () -> {
// Return back all the BBs to pool
for (int i = 0; i < bufsFromPoolFinal.length; i++) {
pool.putbackBuffer(bufsFromPoolFinal[i]);
}
});
}
return new Pair<ByteBuff, RpcServer.CallCleanup>(resultBuf, null);
}
/** /**
* Needed for features such as delayed calls. We need to be able to store the current call * Needed for features such as delayed calls. We need to be able to store the current call
* so that we can complete it later or ask questions of what is supported by the current ongoing * so that we can complete it later or ask questions of what is supported by the current ongoing
@ -3054,4 +3191,44 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
idleScanTimer.schedule(idleScanTask, idleScanInterval); idleScanTimer.schedule(idleScanTask, idleScanInterval);
} }
} }
}
private static class ByteBuffByteInput extends ByteInput {
private ByteBuff buf;
private int offset;
private int length;
ByteBuffByteInput(ByteBuff buf, int offset, int length) {
this.buf = buf;
this.offset = offset;
this.length = length;
}
@Override
public byte read(int offset) {
return this.buf.get(getAbsoluteOffset(offset));
}
private int getAbsoluteOffset(int offset) {
return this.offset + offset;
}
@Override
public int read(int offset, byte[] out, int outOffset, int len) {
this.buf.get(getAbsoluteOffset(offset), out, outOffset, len);
return len;
}
@Override
public int read(int offset, ByteBuffer out) {
int len = out.remaining();
this.buf.get(out, getAbsoluteOffset(offset), len);
return len;
}
@Override
public int size() {
return this.length;
}
}
}

View File

@ -5201,9 +5201,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
*/ */
private HStore getHStore(Cell cell) { private HStore getHStore(Cell cell) {
for (Map.Entry<byte[], Store> famStore : stores.entrySet()) { for (Map.Entry<byte[], Store> famStore : stores.entrySet()) {
if (Bytes.equals( if (CellUtil.matchingFamily(cell, famStore.getKey(), 0, famStore.getKey().length)) {
cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
famStore.getKey(), 0, famStore.getKey().length)) {
return (HStore) famStore.getValue(); return (HStore) famStore.getValue();
} }
} }

View File

@ -35,7 +35,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.ByteBufferSupportOutputStream; import org.apache.hadoop.hbase.io.ByteBufferWriter;
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput; import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper; import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
@ -57,7 +57,7 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
private AsyncFSOutput output; private AsyncFSOutput output;
private static final class OutputStreamWrapper extends OutputStream private static final class OutputStreamWrapper extends OutputStream
implements ByteBufferSupportOutputStream { implements ByteBufferWriter {
private final AsyncFSOutput out; private final AsyncFSOutput out;

View File

@ -21,7 +21,6 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.nio.ByteBuffer;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
@ -33,9 +32,12 @@ import org.apache.hadoop.hbase.codec.BaseDecoder;
import org.apache.hadoop.hbase.codec.BaseEncoder; import org.apache.hadoop.hbase.codec.BaseEncoder;
import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags; import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
import org.apache.hadoop.hbase.io.ByteBufferInputStream; import org.apache.hadoop.hbase.io.ByteBuffInputStream;
import org.apache.hadoop.hbase.io.ByteBufferWriter;
import org.apache.hadoop.hbase.io.ByteBufferWriterOutputStream;
import org.apache.hadoop.hbase.io.util.Dictionary; import org.apache.hadoop.hbase.io.util.Dictionary;
import org.apache.hadoop.hbase.io.util.StreamUtils; import org.apache.hadoop.hbase.io.util.StreamUtils;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hbase.util.ReflectionUtils;
@ -356,14 +358,18 @@ public class WALCellCodec implements Codec {
} }
@Override @Override
public Decoder getDecoder(ByteBuffer buf) { public Decoder getDecoder(ByteBuff buf) {
return getDecoder(new ByteBufferInputStream(buf)); return getDecoder(new ByteBuffInputStream(buf));
} }
@Override @Override
public Encoder getEncoder(OutputStream os) { public Encoder getEncoder(OutputStream os) {
return (compression == null) if (compression == null) {
? new EnsureKvEncoder(os) : new CompressedKvEncoder(os, compression); os = (os instanceof ByteBufferWriter) ? os
: new ByteBufferWriterOutputStream(os);
return new EnsureKvEncoder(os);
}
return new CompressedKvEncoder(os, compression);
} }
public ByteStringCompressor getByteStringCompressor() { public ByteStringCompressor getByteStringCompressor() {

View File

@ -33,7 +33,6 @@ import static org.mockito.internal.verification.VerificationModeFactory.times;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -47,6 +46,7 @@ import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyRequestProto; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
@ -315,7 +315,7 @@ public abstract class AbstractTestIPC {
} }
@Override @Override
protected void processRequest(ByteBuffer buf) throws IOException, InterruptedException { protected void processRequest(ByteBuff buf) throws IOException, InterruptedException {
// this will throw exception after the connection header is read, and an RPC is sent // this will throw exception after the connection header is read, and an RPC is sent
// from client // from client
throw new DoNotRetryIOException("Failing for test"); throw new DoNotRetryIOException("Failing for test");

View File

@ -0,0 +1,140 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.io.ByteBufferPool;
import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.MultiByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.testclassification.RPCTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ RPCTests.class, SmallTests.class })
public class TestRpcServer {
@Test
public void testAllocateByteBuffToReadInto() throws Exception {
System.out.println(Long.MAX_VALUE);
int maxBuffersInPool = 10;
ByteBufferPool pool = new ByteBufferPool(6 * 1024, maxBuffersInPool);
initPoolWithAllBuffers(pool, maxBuffersInPool);
ByteBuff buff = null;
Pair<ByteBuff, CallCleanup> pair;
// When the request size is less than 1/6th of the pool buffer size. We should use on demand
// created on heap Buffer
pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool),
200);
buff = pair.getFirst();
assertTrue(buff.hasArray());
assertEquals(maxBuffersInPool, pool.getQueueSize());
assertNull(pair.getSecond());
// When the request size is > 1/6th of the pool buffer size.
pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool),
1024);
buff = pair.getFirst();
assertFalse(buff.hasArray());
assertEquals(maxBuffersInPool - 1, pool.getQueueSize());
assertNotNull(pair.getSecond());
pair.getSecond().run();// CallCleanup#run should put back the BB to pool.
assertEquals(maxBuffersInPool, pool.getQueueSize());
// Request size> pool buffer size
pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool),
7 * 1024);
buff = pair.getFirst();
assertFalse(buff.hasArray());
assertTrue(buff instanceof MultiByteBuff);
ByteBuffer[] bbs = ((MultiByteBuff) buff).getEnclosingByteBuffers();
assertEquals(2, bbs.length);
assertTrue(bbs[0].isDirect());
assertTrue(bbs[1].isDirect());
assertEquals(6 * 1024, bbs[0].limit());
assertEquals(1024, bbs[1].limit());
assertEquals(maxBuffersInPool - 2, pool.getQueueSize());
assertNotNull(pair.getSecond());
pair.getSecond().run();// CallCleanup#run should put back the BB to pool.
assertEquals(maxBuffersInPool, pool.getQueueSize());
pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool),
6 * 1024 + 200);
buff = pair.getFirst();
assertFalse(buff.hasArray());
assertTrue(buff instanceof MultiByteBuff);
bbs = ((MultiByteBuff) buff).getEnclosingByteBuffers();
assertEquals(2, bbs.length);
assertTrue(bbs[0].isDirect());
assertFalse(bbs[1].isDirect());
assertEquals(6 * 1024, bbs[0].limit());
assertEquals(200, bbs[1].limit());
assertEquals(maxBuffersInPool - 1, pool.getQueueSize());
assertNotNull(pair.getSecond());
pair.getSecond().run();// CallCleanup#run should put back the BB to pool.
assertEquals(maxBuffersInPool, pool.getQueueSize());
ByteBuffer[] buffers = new ByteBuffer[maxBuffersInPool - 1];
for (int i = 0; i < maxBuffersInPool - 1; i++) {
buffers[i] = pool.getBuffer();
}
pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool),
20 * 1024);
buff = pair.getFirst();
assertFalse(buff.hasArray());
assertTrue(buff instanceof MultiByteBuff);
bbs = ((MultiByteBuff) buff).getEnclosingByteBuffers();
assertEquals(2, bbs.length);
assertTrue(bbs[0].isDirect());
assertFalse(bbs[1].isDirect());
assertEquals(6 * 1024, bbs[0].limit());
assertEquals(14 * 1024, bbs[1].limit());
assertEquals(0, pool.getQueueSize());
assertNotNull(pair.getSecond());
pair.getSecond().run();// CallCleanup#run should put back the BB to pool.
assertEquals(1, pool.getQueueSize());
pool.getBuffer();
pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool),
7 * 1024);
buff = pair.getFirst();
assertTrue(buff.hasArray());
assertTrue(buff instanceof SingleByteBuff);
assertEquals(7 * 1024, ((SingleByteBuff) buff).getEnclosingByteBuffer().limit());
assertNull(pair.getSecond());
}
private void initPoolWithAllBuffers(ByteBufferPool pool, int maxBuffersInPool) {
ByteBuffer[] buffers = new ByteBuffer[maxBuffersInPool];
// Just call getBuffer() on pool 'maxBuffersInPool' so as to init all buffers and then put back
// all. Makes pool with max #buffers.
for (int i = 0; i < maxBuffersInPool; i++) {
buffers[i] = pool.getBuffer();
}
for (ByteBuffer buf : buffers) {
pool.putbackBuffer(buf);
}
}
}