HBASE-21916 Abstract an ByteBuffAllocator to allocate/free ByteBuffer in ByteBufferPool

This commit is contained in:
huzheng 2019-02-16 17:16:09 +08:00
parent 974e1086d7
commit c19592a971
34 changed files with 975 additions and 698 deletions

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -41,7 +42,6 @@ import org.apache.hadoop.hbase.io.ByteBuffInputStream;
import org.apache.hadoop.hbase.io.ByteBufferInputStream; import org.apache.hadoop.hbase.io.ByteBufferInputStream;
import org.apache.hadoop.hbase.io.ByteBufferListOutputStream; import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
import org.apache.hadoop.hbase.io.ByteBufferOutputStream; import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
import org.apache.hadoop.hbase.io.ByteBufferPool;
import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff; import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.ClassSize;
@ -208,7 +208,7 @@ class CellBlockBuilder {
* @param codec to use for encoding * @param codec to use for encoding
* @param compressor to use for encoding * @param compressor to use for encoding
* @param cellScanner to encode * @param cellScanner to encode
* @param pool Pool of ByteBuffers to make use of. * @param allocator to allocate the {@link ByteBuff}.
* @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using
* passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has * passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has
* been flipped and is ready for reading. Use limit to find total size. If * been flipped and is ready for reading. Use limit to find total size. If
@ -217,15 +217,14 @@ class CellBlockBuilder {
* @throws IOException if encoding the cells fail * @throws IOException if encoding the cells fail
*/ */
public ByteBufferListOutputStream buildCellBlockStream(Codec codec, CompressionCodec compressor, public ByteBufferListOutputStream buildCellBlockStream(Codec codec, CompressionCodec compressor,
CellScanner cellScanner, ByteBufferPool pool) throws IOException { CellScanner cellScanner, ByteBuffAllocator allocator) throws IOException {
if (cellScanner == null) { if (cellScanner == null) {
return null; return null;
} }
if (codec == null) { if (codec == null) {
throw new CellScannerButNoCodecException(); throw new CellScannerButNoCodecException();
} }
assert pool != null; ByteBufferListOutputStream bbos = new ByteBufferListOutputStream(allocator);
ByteBufferListOutputStream bbos = new ByteBufferListOutputStream(pool);
encodeCellsTo(bbos, cellScanner, codec, compressor); encodeCellsTo(bbos, cellScanner, codec, compressor);
if (bbos.size() == 0) { if (bbos.size() == 0) {
bbos.releaseResources(); bbos.releaseResources();

View File

@ -150,6 +150,10 @@
<groupId>org.apache.hbase.thirdparty</groupId> <groupId>org.apache.hbase.thirdparty</groupId>
<artifactId>hbase-shaded-miscellaneous</artifactId> <artifactId>hbase-shaded-miscellaneous</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.hbase.thirdparty</groupId>
<artifactId>hbase-shaded-netty</artifactId>
</dependency>
<dependency> <dependency>
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId> <artifactId>slf4j-api</artifactId>

View File

@ -0,0 +1,282 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.MultiByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
/**
* ByteBuffAllocator is used for allocating/freeing the ByteBuffers from/to NIO ByteBuffer pool, and
* it provide high-level interfaces for upstream. when allocating desired memory size, it will
* return {@link ByteBuff}, if we are sure that those ByteBuffers have reached the end of life
* cycle, we must do the {@link ByteBuff#release()} to return back the buffers to the pool,
* otherwise ByteBuffers leak will happen, and the NIO ByteBuffer pool may be exhausted. there's
* possible that the desired memory size is large than ByteBufferPool has, we'll downgrade to
* allocate ByteBuffers from heap which meaning the GC pressure may increase again. Of course, an
* better way is increasing the ByteBufferPool size if we detected this case. <br/>
* <br/>
* On the other hand, for better memory utilization, we have set an lower bound named
* minSizeForReservoirUse in this allocator, and if the desired size is less than
* minSizeForReservoirUse, the allocator will just allocate the ByteBuffer from heap and let the JVM
* free its memory, because it's too wasting to allocate a single fixed-size ByteBuffer for some
* small objects. <br/>
* <br/>
* We recommend to use this class to allocate/free {@link ByteBuff} in the RPC layer or the entire
* read/write path, because it hide the details of memory management and its APIs are more friendly
* to the upper layer.
*/
@InterfaceAudience.Private
public class ByteBuffAllocator {
private static final Logger LOG = LoggerFactory.getLogger(ByteBuffAllocator.class);
public static final String MAX_BUFFER_COUNT_KEY = "hbase.ipc.server.allocator.max.buffer.count";
public static final String BUFFER_SIZE_KEY = "hbase.ipc.server.allocator.buffer.size";
// 64 KB. Making it same as the chunk size what we will write/read to/from the socket channel.
public static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
public static final String MIN_ALLOCATE_SIZE_KEY =
"hbase.ipc.server.reservoir.minimal.allocating.size";
public static final Recycler NONE = () -> {
};
public interface Recycler {
void free();
}
private final boolean reservoirEnabled;
private final int bufSize;
private final int maxBufCount;
private final AtomicInteger usedBufCount = new AtomicInteger(0);
private boolean maxPoolSizeInfoLevelLogged = false;
// If the desired size is at least this size, it'll allocated from ByteBufferPool, otherwise it'll
// allocated from heap for better utilization. We make this to be 1/6th of the pool buffer size.
private final int minSizeForReservoirUse;
private final Queue<ByteBuffer> buffers = new ConcurrentLinkedQueue<>();
/**
* Initialize an {@link ByteBuffAllocator} which will try to allocate ByteBuffers from off-heap if
* reservoir is enabled and the reservoir has enough buffers, otherwise the allocator will just
* allocate the insufficient buffers from on-heap to meet the requirement.
* @param conf which get the arguments to initialize the allocator.
* @param reservoirEnabled indicate whether the reservoir is enabled or disabled.
* @return ByteBuffAllocator to manage the byte buffers.
*/
public static ByteBuffAllocator create(Configuration conf, boolean reservoirEnabled) {
int poolBufSize = conf.getInt(BUFFER_SIZE_KEY, DEFAULT_BUFFER_SIZE);
if (reservoirEnabled) {
// The max number of buffers to be pooled in the ByteBufferPool. The default value been
// selected based on the #handlers configured. When it is read request, 2 MB is the max size
// at which we will send back one RPC request. Means max we need 2 MB for creating the
// response cell block. (Well it might be much lesser than this because in 2 MB size calc, we
// include the heap size overhead of each cells also.) Considering 2 MB, we will need
// (2 * 1024 * 1024) / poolBufSize buffers to make the response cell block. Pool buffer size
// is by default 64 KB.
// In case of read request, at the end of the handler process, we will make the response
// cellblock and add the Call to connection's response Q and a single Responder thread takes
// connections and responses from that one by one and do the socket write. So there is chances
// that by the time a handler originated response is actually done writing to socket and so
// released the BBs it used, the handler might have processed one more read req. On an avg 2x
// we consider and consider that also for the max buffers to pool
int bufsForTwoMB = (2 * 1024 * 1024) / poolBufSize;
int maxBuffCount =
conf.getInt(MAX_BUFFER_COUNT_KEY, conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * bufsForTwoMB * 2);
int minSizeForReservoirUse = conf.getInt(MIN_ALLOCATE_SIZE_KEY, poolBufSize / 6);
return new ByteBuffAllocator(true, maxBuffCount, poolBufSize, minSizeForReservoirUse);
} else {
return new ByteBuffAllocator(false, 0, poolBufSize, Integer.MAX_VALUE);
}
}
/**
* Initialize an {@link ByteBuffAllocator} which only allocate ByteBuffer from on-heap, it's
* designed for testing purpose or disabled reservoir case.
* @return allocator to allocate on-heap ByteBuffer.
*/
public static ByteBuffAllocator createOnHeap() {
return new ByteBuffAllocator(false, 0, DEFAULT_BUFFER_SIZE, Integer.MAX_VALUE);
}
@VisibleForTesting
ByteBuffAllocator(boolean reservoirEnabled, int maxBufCount, int bufSize,
int minSizeForReservoirUse) {
this.reservoirEnabled = reservoirEnabled;
this.maxBufCount = maxBufCount;
this.bufSize = bufSize;
this.minSizeForReservoirUse = minSizeForReservoirUse;
}
public boolean isReservoirEnabled() {
return reservoirEnabled;
}
@VisibleForTesting
public int getQueueSize() {
return this.buffers.size();
}
/**
* Allocate an buffer with buffer size from ByteBuffAllocator, Note to call the
* {@link ByteBuff#release()} if no need any more, otherwise the memory leak happen in NIO
* ByteBuffer pool.
* @return an ByteBuff with the buffer size.
*/
public SingleByteBuff allocateOneBuffer() {
if (isReservoirEnabled()) {
ByteBuffer bb = getBuffer();
if (bb != null) {
return new SingleByteBuff(() -> putbackBuffer(bb), bb);
}
}
// Allocated from heap, let the JVM free its memory.
return new SingleByteBuff(NONE, ByteBuffer.allocate(this.bufSize));
}
/**
* Allocate size bytes from the ByteBufAllocator, Note to call the {@link ByteBuff#release()} if
* no need any more, otherwise the memory leak happen in NIO ByteBuffer pool.
* @param size to allocate
* @return an ByteBuff with the desired size.
*/
public ByteBuff allocate(int size) {
if (size < 0) {
throw new IllegalArgumentException("size to allocate should >=0");
}
// If disabled the reservoir, just allocate it from on-heap.
if (!isReservoirEnabled() || size == 0) {
return new SingleByteBuff(NONE, ByteBuffer.allocate(size));
}
int reminder = size % bufSize;
int len = size / bufSize + (reminder > 0 ? 1 : 0);
List<ByteBuffer> bbs = new ArrayList<>(len);
// Allocate from ByteBufferPool until the remaining is less than minSizeForReservoirUse or
// reservoir is exhausted.
int remain = size;
while (remain >= minSizeForReservoirUse) {
ByteBuffer bb = this.getBuffer();
if (bb == null) {
break;
}
bbs.add(bb);
remain -= bufSize;
}
int lenFromReservoir = bbs.size();
if (remain > 0) {
// If the last ByteBuffer is too small or the reservoir can not provide more ByteBuffers, we
// just allocate the ByteBuffer from on-heap.
bbs.add(ByteBuffer.allocate(remain));
}
ByteBuff bb = wrap(bbs, () -> {
for (int i = 0; i < lenFromReservoir; i++) {
this.putbackBuffer(bbs.get(i));
}
});
bb.limit(size);
return bb;
}
public static ByteBuff wrap(ByteBuffer[] buffers, Recycler recycler) {
if (buffers == null || buffers.length == 0) {
throw new IllegalArgumentException("buffers shouldn't be null or empty");
}
return buffers.length == 1 ? new SingleByteBuff(recycler, buffers[0])
: new MultiByteBuff(recycler, buffers);
}
public static ByteBuff wrap(ByteBuffer[] buffers) {
return wrap(buffers, NONE);
}
public static ByteBuff wrap(List<ByteBuffer> buffers, Recycler recycler) {
if (buffers == null || buffers.size() == 0) {
throw new IllegalArgumentException("buffers shouldn't be null or empty");
}
return buffers.size() == 1 ? new SingleByteBuff(recycler, buffers.get(0))
: new MultiByteBuff(recycler, buffers.toArray(new ByteBuffer[0]));
}
public static ByteBuff wrap(List<ByteBuffer> buffers) {
return wrap(buffers, NONE);
}
/**
* @return One free DirectByteBuffer from the pool. If no free ByteBuffer and we have not reached
* the maximum pool size, it will create a new one and return. In case of max pool size
* also reached, will return null. When pool returned a ByteBuffer, make sure to return it
* back to pool after use.
*/
private ByteBuffer getBuffer() {
ByteBuffer bb = buffers.poll();
if (bb != null) {
// To reset the limit to capacity and position to 0, must clear here.
bb.clear();
return bb;
}
while (true) {
int c = this.usedBufCount.intValue();
if (c >= this.maxBufCount) {
if (!maxPoolSizeInfoLevelLogged) {
LOG.info("Pool already reached its max capacity : {} and no free buffers now. Consider "
+ "increasing the value for '{}' ?",
maxBufCount, MAX_BUFFER_COUNT_KEY);
maxPoolSizeInfoLevelLogged = true;
}
return null;
}
if (!this.usedBufCount.compareAndSet(c, c + 1)) {
continue;
}
return ByteBuffer.allocateDirect(bufSize);
}
}
/**
* Return back a ByteBuffer after its use. Don't read/write the ByteBuffer after the returning.
* @param buf ByteBuffer to return.
*/
private void putbackBuffer(ByteBuffer buf) {
if (buf.capacity() != bufSize || (reservoirEnabled ^ buf.isDirect())) {
LOG.warn("Trying to put a buffer, not created by this pool! Will be just ignored");
return;
}
buffers.offer(buf);
}
}

View File

@ -23,6 +23,8 @@ import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -39,18 +41,17 @@ import org.slf4j.LoggerFactory;
public class ByteBufferListOutputStream extends ByteBufferOutputStream { public class ByteBufferListOutputStream extends ByteBufferOutputStream {
private static final Logger LOG = LoggerFactory.getLogger(ByteBufferListOutputStream.class); private static final Logger LOG = LoggerFactory.getLogger(ByteBufferListOutputStream.class);
private ByteBufferPool pool; private ByteBuffAllocator allocator;
// Keep track of the BBs where bytes written to. We will first try to get a BB from the pool. If // Keep track of the BBs where bytes written to. We will first try to get a BB from the pool. If
// it is not available will make a new one our own and keep writing to that. We keep track of all // it is not available will make a new one our own and keep writing to that. We keep track of all
// the BBs that we got from pool, separately so that on closeAndPutbackBuffers, we can make sure // the BBs that we got from pool, separately so that on closeAndPutbackBuffers, we can make sure
// to return back all of them to pool // to return back all of them to pool
protected List<ByteBuffer> allBufs = new ArrayList<>(); protected List<SingleByteBuff> allBufs = new ArrayList<>();
protected List<ByteBuffer> bufsFromPool = new ArrayList<>();
private boolean lastBufFlipped = false;// Indicate whether the curBuf/lastBuf is flipped already private boolean lastBufFlipped = false;// Indicate whether the curBuf/lastBuf is flipped already
public ByteBufferListOutputStream(ByteBufferPool pool) { public ByteBufferListOutputStream(ByteBuffAllocator allocator) {
this.pool = pool; this.allocator = allocator;
allocateNewBuffer(); allocateNewBuffer();
} }
@ -58,18 +59,10 @@ public class ByteBufferListOutputStream extends ByteBufferOutputStream {
if (this.curBuf != null) { if (this.curBuf != null) {
this.curBuf.flip();// On the current buf set limit = pos and pos = 0. this.curBuf.flip();// On the current buf set limit = pos and pos = 0.
} }
// Get an initial BB to work with from the pool // Get an initial ByteBuffer from the allocator.
this.curBuf = this.pool.getBuffer(); SingleByteBuff sbb = allocator.allocateOneBuffer();
if (this.curBuf == null) { this.curBuf = sbb.nioByteBuffers()[0];
// No free BB at this moment. Make a new one. The pool returns off heap BBs. Don't make off this.allBufs.add(sbb);
// heap BB on demand. It is difficult to account for all such and so proper sizing of Max
// direct heap size. See HBASE-15525 also for more details.
// Make BB with same size of pool's buffer size.
this.curBuf = ByteBuffer.allocate(this.pool.getBufferSize());
} else {
this.bufsFromPool.add(this.curBuf);
}
this.allBufs.add(this.curBuf);
} }
@Override @Override
@ -118,11 +111,8 @@ public class ByteBufferListOutputStream extends ByteBufferOutputStream {
LOG.debug(e.toString(), e); LOG.debug(e.toString(), e);
} }
// Return back all the BBs to pool // Return back all the BBs to pool
if (this.bufsFromPool != null) { for (ByteBuff buf : this.allBufs) {
for (int i = 0; i < this.bufsFromPool.size(); i++) { buf.release();
this.pool.putbackBuffer(this.bufsFromPool.get(i));
}
this.bufsFromPool = null;
} }
this.allBufs = null; this.allBufs = null;
this.curBuf = null; this.curBuf = null;
@ -144,7 +134,11 @@ public class ByteBufferListOutputStream extends ByteBufferOutputStream {
// All the other BBs are already flipped while moving to the new BB. // All the other BBs are already flipped while moving to the new BB.
curBuf.flip(); curBuf.flip();
} }
return this.allBufs; List<ByteBuffer> bbs = new ArrayList<>(this.allBufs.size());
for (SingleByteBuff bb : this.allBufs) {
bbs.add(bb.nioByteBuffers()[0]);
}
return bbs;
} }
@Override @Override

View File

@ -1,155 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io;
import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
/**
* Like Hadoops' ByteBufferPool only you do not specify desired size when getting a ByteBuffer. This
* pool keeps an upper bound on the count of ByteBuffers in the pool and a fixed size of ByteBuffer
* that it will create. When requested, if a free ByteBuffer is already present, it will return
* that. And when no free ByteBuffer available and we are below the max count, it will create a new
* one and return that.
*
* <p>
* Note: This pool returns off heap ByteBuffers by default. If on heap ByteBuffers to be pooled,
* pass 'directByteBuffer' as false while construction of the pool.
* <p>
* This class is thread safe.
*
* @see ByteBufferListOutputStream
*/
@InterfaceAudience.Private
public class ByteBufferPool {
private static final Logger LOG = LoggerFactory.getLogger(ByteBufferPool.class);
// TODO better config names?
// hbase.ipc.server.reservoir.initial.max -> hbase.ipc.server.reservoir.max.buffer.count
// hbase.ipc.server.reservoir.initial.buffer.size -> hbase.ipc.server.reservoir.buffer.size
public static final String MAX_POOL_SIZE_KEY = "hbase.ipc.server.reservoir.initial.max";
public static final String BUFFER_SIZE_KEY = "hbase.ipc.server.reservoir.initial.buffer.size";
public static final int DEFAULT_BUFFER_SIZE = 64 * 1024;// 64 KB. Making it same as the chunk size
// what we will write/read to/from the
// socket channel.
private final Queue<ByteBuffer> buffers = new ConcurrentLinkedQueue<>();
private final int bufferSize;
private final int maxPoolSize;
private AtomicInteger count; // Count of the BBs created already for this pool.
private final boolean directByteBuffer; //Whether this pool should return DirectByteBuffers
private boolean maxPoolSizeInfoLevelLogged = false;
/**
* @param bufferSize Size of each buffer created by this pool.
* @param maxPoolSize Max number of buffers to keep in this pool.
*/
public ByteBufferPool(int bufferSize, int maxPoolSize) {
this(bufferSize, maxPoolSize, true);
}
/**
* @param bufferSize Size of each buffer created by this pool.
* @param maxPoolSize Max number of buffers to keep in this pool.
* @param directByteBuffer Whether to create direct ByteBuffer or on heap ByteBuffer.
*/
public ByteBufferPool(int bufferSize, int maxPoolSize, boolean directByteBuffer) {
this.bufferSize = bufferSize;
this.maxPoolSize = maxPoolSize;
this.directByteBuffer = directByteBuffer;
// TODO can add initialPoolSize config also and make those many BBs ready for use.
LOG.info("Created with bufferSize={} and maxPoolSize={}",
org.apache.hadoop.util.StringUtils.byteDesc(bufferSize),
org.apache.hadoop.util.StringUtils.byteDesc(maxPoolSize));
this.count = new AtomicInteger(0);
}
/**
* @return One free ByteBuffer from the pool. If no free ByteBuffer and we have not reached the
* maximum pool size, it will create a new one and return. In case of max pool size also
* reached, will return null. When pool returned a ByteBuffer, make sure to return it back
* to pool after use.
* @see #putbackBuffer(ByteBuffer)
*/
public ByteBuffer getBuffer() {
ByteBuffer bb = buffers.poll();
if (bb != null) {
// Clear sets limit == capacity. Position == 0.
bb.clear();
return bb;
}
while (true) {
int c = this.count.intValue();
if (c >= this.maxPoolSize) {
if (maxPoolSizeInfoLevelLogged) {
if (LOG.isDebugEnabled()) {
LOG.debug("Pool already reached its max capacity : " + this.maxPoolSize
+ " and no free buffers now. Consider increasing the value for '"
+ MAX_POOL_SIZE_KEY + "' ?");
}
} else {
LOG.info("Pool already reached its max capacity : " + this.maxPoolSize
+ " and no free buffers now. Consider increasing the value for '" + MAX_POOL_SIZE_KEY
+ "' ?");
maxPoolSizeInfoLevelLogged = true;
}
return null;
}
if (!this.count.compareAndSet(c, c + 1)) {
continue;
}
if (LOG.isTraceEnabled()) {
LOG.trace("Creating a new offheap ByteBuffer of size: " + this.bufferSize);
}
return this.directByteBuffer ? ByteBuffer.allocateDirect(this.bufferSize)
: ByteBuffer.allocate(this.bufferSize);
}
}
/**
* Return back a ByteBuffer after its use. Do not try to return put back a ByteBuffer, not
* obtained from this pool.
* @param buf ByteBuffer to return.
*/
public void putbackBuffer(ByteBuffer buf) {
if (buf.capacity() != this.bufferSize || (this.directByteBuffer ^ buf.isDirect())) {
LOG.warn("Trying to put a buffer, not created by this pool! Will be just ignored");
return;
}
buffers.offer(buf);
}
public int getBufferSize() {
return this.bufferSize;
}
/**
* @return Number of free buffers
*/
@VisibleForTesting
public int getQueueSize() {
return buffers.size();
}
}

View File

@ -98,7 +98,7 @@ public class CopyKeyDataBlockEncoder extends BufferedDataBlockEncoder {
currentBuffer.skip(current.tagsLength); currentBuffer.skip(current.tagsLength);
} }
if (includesMvcc()) { if (includesMvcc()) {
current.memstoreTS = ByteBuff.readVLong(currentBuffer); current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
} else { } else {
current.memstoreTS = 0; current.memstoreTS = 0;
} }

View File

@ -477,7 +477,7 @@ public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder {
decodeTags(); decodeTags();
} }
if (includesMvcc()) { if (includesMvcc()) {
current.memstoreTS = ByteBuff.readVLong(currentBuffer); current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
} else { } else {
current.memstoreTS = 0; current.memstoreTS = 0;
} }

View File

@ -501,7 +501,7 @@ public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder {
decodeTags(); decodeTags();
} }
if (includesMvcc()) { if (includesMvcc()) {
current.memstoreTS = ByteBuff.readVLong(currentBuffer); current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
} else { } else {
current.memstoreTS = 0; current.memstoreTS = 0;
} }

View File

@ -213,7 +213,7 @@ public class PrefixKeyDeltaEncoder extends BufferedDataBlockEncoder {
decodeTags(); decodeTags();
} }
if (includesMvcc()) { if (includesMvcc()) {
current.memstoreTS = ByteBuff.readVLong(currentBuffer); current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
} else { } else {
current.memstoreTS = 0; current.memstoreTS = 0;
} }

View File

@ -282,7 +282,7 @@ public class RowIndexSeekerV1 extends AbstractEncodedSeeker {
decodeTags(); decodeTags();
} }
if (includesMvcc()) { if (includesMvcc()) {
current.memstoreTS = ByteBuff.readVLong(currentBuffer); current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
} else { } else {
current.memstoreTS = 0; current.memstoreTS = 0;
} }

View File

@ -24,22 +24,81 @@ import java.nio.channels.ReadableByteChannel;
import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ObjectIntPair; import org.apache.hadoop.hbase.util.ObjectIntPair;
import org.apache.hadoop.io.WritableUtils;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.io.netty.util.ReferenceCounted;
import org.apache.hbase.thirdparty.io.netty.util.internal.ObjectUtil;
/** /**
* An abstract class that abstracts out as to how the byte buffers are used, * An abstract class that abstracts out as to how the byte buffers are used, either single or
* either single or multiple. We have this interface because the java's ByteBuffers * multiple. We have this interface because the java's ByteBuffers cannot be sub-classed. This class
* cannot be sub-classed. This class provides APIs similar to the ones provided * provides APIs similar to the ones provided in java's nio ByteBuffers and allows you to do
* in java's nio ByteBuffers and allows you to do positional reads/writes and relative * positional reads/writes and relative reads and writes on the underlying BB. In addition to it, we
* reads and writes on the underlying BB. In addition to it, we have some additional APIs which * have some additional APIs which helps us in the read path. <br/>
* helps us in the read path. * The ByteBuff implement {@link ReferenceCounted} interface which mean need to maintains a
* {@link RefCnt} inside, if ensure that the ByteBuff won't be used any more, we must do a
* {@link ByteBuff#release()} to recycle its NIO ByteBuffers. when considering the
* {@link ByteBuff#duplicate()} or {@link ByteBuff#slice()}, releasing either the duplicated one or
* the original one will free its memory, because they share the same NIO ByteBuffers. when you want
* to retain the NIO ByteBuffers even if the origin one called {@link ByteBuff#release()}, you can
* do like this:
*
* <pre>
* ByteBuff original = ...;
* ByteBuff dup = original.duplicate();
* dup.retain();
* original.release();
* // The NIO buffers can still be accessed unless you release the duplicated one
* dup.get(...);
* dup.release();
* // Both the original and dup can not access the NIO buffers any more.
* </pre>
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
// TODO to have another name. This can easily get confused with netty's ByteBuf public abstract class ByteBuff implements ReferenceCounted {
public abstract class ByteBuff { private static final String REFERENCE_COUNT_NAME = "ReferenceCount";
private static final int NIO_BUFFER_LIMIT = 64 * 1024; // should not be more than 64KB. private static final int NIO_BUFFER_LIMIT = 64 * 1024; // should not be more than 64KB.
protected RefCnt refCnt;
/*************************** Methods for reference count **********************************/
protected void checkRefCount() {
ObjectUtil.checkPositive(refCnt(), REFERENCE_COUNT_NAME);
}
public int refCnt() {
return refCnt.refCnt();
}
@Override
public boolean release() {
return refCnt.release();
}
@Override
public final ByteBuff retain(int increment) {
throw new UnsupportedOperationException();
}
@Override
public final boolean release(int increment) {
throw new UnsupportedOperationException();
}
@Override
public final ByteBuff touch() {
throw new UnsupportedOperationException();
}
@Override
public final ByteBuff touch(Object hint) {
throw new UnsupportedOperationException();
}
/******************************* Methods for ByteBuff **************************************/
/** /**
* @return this ByteBuff's current position * @return this ByteBuff's current position
*/ */
@ -491,78 +550,11 @@ public abstract class ByteBuff {
return tmpLength; return tmpLength;
} }
/** public abstract ByteBuffer[] nioByteBuffers();
* Similar to {@link WritableUtils#readVLong(java.io.DataInput)} but reads from a
* {@link ByteBuff}.
*/
public static long readVLong(ByteBuff in) {
byte firstByte = in.get();
int len = WritableUtils.decodeVIntSize(firstByte);
if (len == 1) {
return firstByte;
}
long i = 0;
for (int idx = 0; idx < len-1; idx++) {
byte b = in.get();
i = i << 8;
i = i | (b & 0xFF);
}
return (WritableUtils.isNegativeVInt(firstByte) ? (i ^ -1L) : i);
}
/**
* Search sorted array "a" for byte "key".
*
* @param a Array to search. Entries must be sorted and unique.
* @param fromIndex First index inclusive of "a" to include in the search.
* @param toIndex Last index exclusive of "a" to include in the search.
* @param key The byte to search for.
* @return The index of key if found. If not found, return -(index + 1), where
* negative indicates "not found" and the "index + 1" handles the "-0"
* case.
*/
public static int unsignedBinarySearch(ByteBuff a, int fromIndex, int toIndex, byte key) {
int unsignedKey = key & 0xff;
int low = fromIndex;
int high = toIndex - 1;
while (low <= high) {
int mid = low + ((high - low) >> 1);
int midVal = a.get(mid) & 0xff;
if (midVal < unsignedKey) {
low = mid + 1;
} else if (midVal > unsignedKey) {
high = mid - 1;
} else {
return mid; // key found
}
}
return -(low + 1); // key not found.
}
@Override @Override
public String toString() { public String toString() {
return this.getClass().getSimpleName() + "[pos=" + position() + ", lim=" + limit() + return this.getClass().getSimpleName() + "[pos=" + position() + ", lim=" + limit() +
", cap= " + capacity() + "]"; ", cap= " + capacity() + "]";
} }
public static String toStringBinary(final ByteBuff b, int off, int len) {
StringBuilder result = new StringBuilder();
// Just in case we are passed a 'len' that is > buffer length...
if (off >= b.capacity())
return result.toString();
if (off + len > b.capacity())
len = b.capacity() - off;
for (int i = off; i < off + len; ++i) {
int ch = b.get(i) & 0xFF;
if ((ch >= '0' && ch <= '9') || (ch >= 'A' && ch <= 'Z') || (ch >= 'a' && ch <= 'z')
|| " `~!@#$%^&*()-_=+[]{}|;:'\",.<>/?".indexOf(ch) >= 0) {
result.append((char) ch);
} else {
result.append(String.format("\\x%02X", ch));
}
}
return result.toString();
}
} }

View File

@ -17,6 +17,8 @@
*/ */
package org.apache.hadoop.hbase.nio; package org.apache.hadoop.hbase.nio;
import static org.apache.hadoop.hbase.io.ByteBuffAllocator.NONE;
import java.io.IOException; import java.io.IOException;
import java.nio.BufferOverflowException; import java.nio.BufferOverflowException;
import java.nio.BufferUnderflowException; import java.nio.BufferUnderflowException;
@ -24,13 +26,12 @@ import java.nio.ByteBuffer;
import java.nio.InvalidMarkException; import java.nio.InvalidMarkException;
import java.nio.channels.ReadableByteChannel; import java.nio.channels.ReadableByteChannel;
import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ObjectIntPair; import org.apache.hadoop.hbase.util.ObjectIntPair;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
/** /**
* Provides a unified view of all the underlying ByteBuffers and will look as if a bigger * Provides a unified view of all the underlying ByteBuffers and will look as if a bigger
* sequential buffer. This class provides similar APIs as in {@link ByteBuffer} to put/get int, * sequential buffer. This class provides similar APIs as in {@link ByteBuffer} to put/get int,
@ -53,6 +54,15 @@ public class MultiByteBuff extends ByteBuff {
private final int[] itemBeginPos; private final int[] itemBeginPos;
public MultiByteBuff(ByteBuffer... items) { public MultiByteBuff(ByteBuffer... items) {
this(NONE, items);
}
public MultiByteBuff(Recycler recycler, ByteBuffer... items) {
this(new RefCnt(recycler), items);
}
private MultiByteBuff(RefCnt refCnt, ByteBuffer... items) {
this.refCnt = refCnt;
assert items != null; assert items != null;
assert items.length > 0; assert items.length > 0;
this.items = items; this.items = items;
@ -75,8 +85,9 @@ public class MultiByteBuff extends ByteBuff {
this.limitedItemIndex = this.items.length - 1; this.limitedItemIndex = this.items.length - 1;
} }
private MultiByteBuff(ByteBuffer[] items, int[] itemBeginPos, int limit, int limitedIndex, private MultiByteBuff(RefCnt refCnt, ByteBuffer[] items, int[] itemBeginPos, int limit,
int curItemIndex, int markedIndex) { int limitedIndex, int curItemIndex, int markedIndex) {
this.refCnt = refCnt;
this.items = items; this.items = items;
this.curItemIndex = curItemIndex; this.curItemIndex = curItemIndex;
this.curItem = this.items[this.curItemIndex]; this.curItem = this.items[this.curItemIndex];
@ -117,6 +128,7 @@ public class MultiByteBuff extends ByteBuff {
*/ */
@Override @Override
public int capacity() { public int capacity() {
checkRefCount();
int c = 0; int c = 0;
for (ByteBuffer item : this.items) { for (ByteBuffer item : this.items) {
c += item.capacity(); c += item.capacity();
@ -131,12 +143,14 @@ public class MultiByteBuff extends ByteBuff {
*/ */
@Override @Override
public byte get(int index) { public byte get(int index) {
checkRefCount();
int itemIndex = getItemIndex(index); int itemIndex = getItemIndex(index);
return ByteBufferUtils.toByte(this.items[itemIndex], index - this.itemBeginPos[itemIndex]); return ByteBufferUtils.toByte(this.items[itemIndex], index - this.itemBeginPos[itemIndex]);
} }
@Override @Override
public byte getByteAfterPosition(int offset) { public byte getByteAfterPosition(int offset) {
checkRefCount();
// Mostly the index specified will land within this current item. Short circuit for that // Mostly the index specified will land within this current item. Short circuit for that
int index = offset + this.position(); int index = offset + this.position();
int itemIndex = getItemIndexFromCurItemIndex(index); int itemIndex = getItemIndexFromCurItemIndex(index);
@ -179,6 +193,7 @@ public class MultiByteBuff extends ByteBuff {
*/ */
@Override @Override
public int getInt(int index) { public int getInt(int index) {
checkRefCount();
// Mostly the index specified will land within this current item. Short circuit for that // Mostly the index specified will land within this current item. Short circuit for that
int itemIndex; int itemIndex;
if (this.itemBeginPos[this.curItemIndex] <= index if (this.itemBeginPos[this.curItemIndex] <= index
@ -192,6 +207,7 @@ public class MultiByteBuff extends ByteBuff {
@Override @Override
public int getIntAfterPosition(int offset) { public int getIntAfterPosition(int offset) {
checkRefCount();
// Mostly the index specified will land within this current item. Short circuit for that // Mostly the index specified will land within this current item. Short circuit for that
int index = offset + this.position(); int index = offset + this.position();
int itemIndex; int itemIndex;
@ -210,6 +226,7 @@ public class MultiByteBuff extends ByteBuff {
*/ */
@Override @Override
public short getShort(int index) { public short getShort(int index) {
checkRefCount();
// Mostly the index specified will land within this current item. Short circuit for that // Mostly the index specified will land within this current item. Short circuit for that
int itemIndex; int itemIndex;
if (this.itemBeginPos[this.curItemIndex] <= index if (this.itemBeginPos[this.curItemIndex] <= index
@ -238,6 +255,7 @@ public class MultiByteBuff extends ByteBuff {
@Override @Override
public short getShortAfterPosition(int offset) { public short getShortAfterPosition(int offset) {
checkRefCount();
// Mostly the index specified will land within this current item. Short circuit for that // Mostly the index specified will land within this current item. Short circuit for that
int index = offset + this.position(); int index = offset + this.position();
int itemIndex; int itemIndex;
@ -319,6 +337,7 @@ public class MultiByteBuff extends ByteBuff {
*/ */
@Override @Override
public long getLong(int index) { public long getLong(int index) {
checkRefCount();
// Mostly the index specified will land within this current item. Short circuit for that // Mostly the index specified will land within this current item. Short circuit for that
int itemIndex; int itemIndex;
if (this.itemBeginPos[this.curItemIndex] <= index if (this.itemBeginPos[this.curItemIndex] <= index
@ -332,6 +351,7 @@ public class MultiByteBuff extends ByteBuff {
@Override @Override
public long getLongAfterPosition(int offset) { public long getLongAfterPosition(int offset) {
checkRefCount();
// Mostly the index specified will land within this current item. Short circuit for that // Mostly the index specified will land within this current item. Short circuit for that
int index = offset + this.position(); int index = offset + this.position();
int itemIndex; int itemIndex;
@ -348,6 +368,7 @@ public class MultiByteBuff extends ByteBuff {
*/ */
@Override @Override
public int position() { public int position() {
checkRefCount();
return itemBeginPos[this.curItemIndex] + this.curItem.position(); return itemBeginPos[this.curItemIndex] + this.curItem.position();
} }
@ -358,6 +379,7 @@ public class MultiByteBuff extends ByteBuff {
*/ */
@Override @Override
public MultiByteBuff position(int position) { public MultiByteBuff position(int position) {
checkRefCount();
// Short circuit for positioning within the cur item. Mostly that is the case. // Short circuit for positioning within the cur item. Mostly that is the case.
if (this.itemBeginPos[this.curItemIndex] <= position if (this.itemBeginPos[this.curItemIndex] <= position
&& this.itemBeginPos[this.curItemIndex + 1] > position) { && this.itemBeginPos[this.curItemIndex + 1] > position) {
@ -385,6 +407,7 @@ public class MultiByteBuff extends ByteBuff {
*/ */
@Override @Override
public MultiByteBuff rewind() { public MultiByteBuff rewind() {
checkRefCount();
for (int i = 0; i < this.items.length; i++) { for (int i = 0; i < this.items.length; i++) {
this.items[i].rewind(); this.items[i].rewind();
} }
@ -400,6 +423,7 @@ public class MultiByteBuff extends ByteBuff {
*/ */
@Override @Override
public MultiByteBuff mark() { public MultiByteBuff mark() {
checkRefCount();
this.markedItemIndex = this.curItemIndex; this.markedItemIndex = this.curItemIndex;
this.curItem.mark(); this.curItem.mark();
return this; return this;
@ -412,6 +436,7 @@ public class MultiByteBuff extends ByteBuff {
*/ */
@Override @Override
public MultiByteBuff reset() { public MultiByteBuff reset() {
checkRefCount();
// when the buffer is moved to the next one.. the reset should happen on the previous marked // when the buffer is moved to the next one.. the reset should happen on the previous marked
// item and the new one should be taken as the base // item and the new one should be taken as the base
if (this.markedItemIndex < 0) throw new InvalidMarkException(); if (this.markedItemIndex < 0) throw new InvalidMarkException();
@ -433,6 +458,7 @@ public class MultiByteBuff extends ByteBuff {
*/ */
@Override @Override
public int remaining() { public int remaining() {
checkRefCount();
int remain = 0; int remain = 0;
for (int i = curItemIndex; i < items.length; i++) { for (int i = curItemIndex; i < items.length; i++) {
remain += items[i].remaining(); remain += items[i].remaining();
@ -446,6 +472,7 @@ public class MultiByteBuff extends ByteBuff {
*/ */
@Override @Override
public final boolean hasRemaining() { public final boolean hasRemaining() {
checkRefCount();
return this.curItem.hasRemaining() || (this.curItemIndex < this.limitedItemIndex return this.curItem.hasRemaining() || (this.curItemIndex < this.limitedItemIndex
&& this.items[this.curItemIndex + 1].hasRemaining()); && this.items[this.curItemIndex + 1].hasRemaining());
} }
@ -457,6 +484,7 @@ public class MultiByteBuff extends ByteBuff {
*/ */
@Override @Override
public byte get() { public byte get() {
checkRefCount();
if (this.curItem.remaining() == 0) { if (this.curItem.remaining() == 0) {
if (items.length - 1 == this.curItemIndex) { if (items.length - 1 == this.curItemIndex) {
// means cur item is the last one and we wont be able to read a long. Throw exception // means cur item is the last one and we wont be able to read a long. Throw exception
@ -476,6 +504,7 @@ public class MultiByteBuff extends ByteBuff {
*/ */
@Override @Override
public short getShort() { public short getShort() {
checkRefCount();
int remaining = this.curItem.remaining(); int remaining = this.curItem.remaining();
if (remaining >= Bytes.SIZEOF_SHORT) { if (remaining >= Bytes.SIZEOF_SHORT) {
return this.curItem.getShort(); return this.curItem.getShort();
@ -494,6 +523,7 @@ public class MultiByteBuff extends ByteBuff {
*/ */
@Override @Override
public int getInt() { public int getInt() {
checkRefCount();
int remaining = this.curItem.remaining(); int remaining = this.curItem.remaining();
if (remaining >= Bytes.SIZEOF_INT) { if (remaining >= Bytes.SIZEOF_INT) {
return this.curItem.getInt(); return this.curItem.getInt();
@ -514,6 +544,7 @@ public class MultiByteBuff extends ByteBuff {
*/ */
@Override @Override
public long getLong() { public long getLong() {
checkRefCount();
int remaining = this.curItem.remaining(); int remaining = this.curItem.remaining();
if (remaining >= Bytes.SIZEOF_LONG) { if (remaining >= Bytes.SIZEOF_LONG) {
return this.curItem.getLong(); return this.curItem.getLong();
@ -545,6 +576,7 @@ public class MultiByteBuff extends ByteBuff {
*/ */
@Override @Override
public void get(byte[] dst, int offset, int length) { public void get(byte[] dst, int offset, int length) {
checkRefCount();
while (length > 0) { while (length > 0) {
int toRead = Math.min(length, this.curItem.remaining()); int toRead = Math.min(length, this.curItem.remaining());
ByteBufferUtils.copyFromBufferToArray(dst, this.curItem, this.curItem.position(), offset, ByteBufferUtils.copyFromBufferToArray(dst, this.curItem, this.curItem.position(), offset,
@ -560,6 +592,7 @@ public class MultiByteBuff extends ByteBuff {
@Override @Override
public void get(int sourceOffset, byte[] dst, int offset, int length) { public void get(int sourceOffset, byte[] dst, int offset, int length) {
checkRefCount();
int itemIndex = getItemIndex(sourceOffset); int itemIndex = getItemIndex(sourceOffset);
ByteBuffer item = this.items[itemIndex]; ByteBuffer item = this.items[itemIndex];
sourceOffset = sourceOffset - this.itemBeginPos[itemIndex]; sourceOffset = sourceOffset - this.itemBeginPos[itemIndex];
@ -583,6 +616,7 @@ public class MultiByteBuff extends ByteBuff {
*/ */
@Override @Override
public MultiByteBuff limit(int limit) { public MultiByteBuff limit(int limit) {
checkRefCount();
this.limit = limit; this.limit = limit;
// Normally the limit will try to limit within the last BB item // Normally the limit will try to limit within the last BB item
int limitedIndexBegin = this.itemBeginPos[this.limitedItemIndex]; int limitedIndexBegin = this.itemBeginPos[this.limitedItemIndex];
@ -622,29 +656,30 @@ public class MultiByteBuff extends ByteBuff {
*/ */
@Override @Override
public MultiByteBuff slice() { public MultiByteBuff slice() {
checkRefCount();
ByteBuffer[] copy = new ByteBuffer[this.limitedItemIndex - this.curItemIndex + 1]; ByteBuffer[] copy = new ByteBuffer[this.limitedItemIndex - this.curItemIndex + 1];
for (int i = curItemIndex, j = 0; i <= this.limitedItemIndex; i++, j++) { for (int i = curItemIndex, j = 0; i <= this.limitedItemIndex; i++, j++) {
copy[j] = this.items[i].slice(); copy[j] = this.items[i].slice();
} }
return new MultiByteBuff(copy); return new MultiByteBuff(refCnt, copy);
} }
/** /**
* Returns an MBB which is a duplicate version of this MBB. The position, limit and mark * Returns an MBB which is a duplicate version of this MBB. The position, limit and mark of the
* of the new MBB will be independent than that of the original MBB. * new MBB will be independent than that of the original MBB. The content of the new MBB will
* The content of the new MBB will start at this MBB's current position * start at this MBB's current position The position, limit and mark of the new MBB would be
* The position, limit and mark of the new MBB would be identical to this MBB in terms of * identical to this MBB in terms of values.
* values. * @return a duplicated MBB
* @return a sliced MBB
*/ */
@Override @Override
public MultiByteBuff duplicate() { public MultiByteBuff duplicate() {
checkRefCount();
ByteBuffer[] itemsCopy = new ByteBuffer[this.items.length]; ByteBuffer[] itemsCopy = new ByteBuffer[this.items.length];
for (int i = 0; i < this.items.length; i++) { for (int i = 0; i < this.items.length; i++) {
itemsCopy[i] = items[i].duplicate(); itemsCopy[i] = items[i].duplicate();
} }
return new MultiByteBuff(itemsCopy, this.itemBeginPos, this.limit, this.limitedItemIndex, return new MultiByteBuff(refCnt, itemsCopy, this.itemBeginPos, this.limit,
this.curItemIndex, this.markedItemIndex); this.limitedItemIndex, this.curItemIndex, this.markedItemIndex);
} }
/** /**
@ -654,6 +689,7 @@ public class MultiByteBuff extends ByteBuff {
*/ */
@Override @Override
public MultiByteBuff put(byte b) { public MultiByteBuff put(byte b) {
checkRefCount();
if (this.curItem.remaining() == 0) { if (this.curItem.remaining() == 0) {
if (this.curItemIndex == this.items.length - 1) { if (this.curItemIndex == this.items.length - 1) {
throw new BufferOverflowException(); throw new BufferOverflowException();
@ -673,6 +709,7 @@ public class MultiByteBuff extends ByteBuff {
*/ */
@Override @Override
public MultiByteBuff put(int index, byte b) { public MultiByteBuff put(int index, byte b) {
checkRefCount();
int itemIndex = getItemIndex(limit); int itemIndex = getItemIndex(limit);
ByteBuffer item = items[itemIndex]; ByteBuffer item = items[itemIndex];
item.put(index - itemBeginPos[itemIndex], b); item.put(index - itemBeginPos[itemIndex], b);
@ -688,6 +725,7 @@ public class MultiByteBuff extends ByteBuff {
*/ */
@Override @Override
public MultiByteBuff put(int offset, ByteBuff src, int srcOffset, int length) { public MultiByteBuff put(int offset, ByteBuff src, int srcOffset, int length) {
checkRefCount();
int destItemIndex = getItemIndex(offset); int destItemIndex = getItemIndex(offset);
int srcItemIndex = getItemIndex(srcOffset); int srcItemIndex = getItemIndex(srcOffset);
ByteBuffer destItem = this.items[destItemIndex]; ByteBuffer destItem = this.items[destItemIndex];
@ -723,7 +761,7 @@ public class MultiByteBuff extends ByteBuff {
} }
private static ByteBuffer getItemByteBuffer(ByteBuff buf, int index) { private static ByteBuffer getItemByteBuffer(ByteBuff buf, int index) {
return (buf instanceof SingleByteBuff) ? ((SingleByteBuff) buf).getEnclosingByteBuffer() return (buf instanceof SingleByteBuff) ? buf.nioByteBuffers()[0]
: ((MultiByteBuff) buf).items[index]; : ((MultiByteBuff) buf).items[index];
} }
@ -734,6 +772,7 @@ public class MultiByteBuff extends ByteBuff {
*/ */
@Override @Override
public MultiByteBuff putInt(int val) { public MultiByteBuff putInt(int val) {
checkRefCount();
if (this.curItem.remaining() >= Bytes.SIZEOF_INT) { if (this.curItem.remaining() >= Bytes.SIZEOF_INT) {
this.curItem.putInt(val); this.curItem.putInt(val);
return this; return this;
@ -784,6 +823,7 @@ public class MultiByteBuff extends ByteBuff {
*/ */
@Override @Override
public MultiByteBuff put(byte[] src, int offset, int length) { public MultiByteBuff put(byte[] src, int offset, int length) {
checkRefCount();
if (this.curItem.remaining() >= length) { if (this.curItem.remaining() >= length) {
ByteBufferUtils.copyFromArrayToBuffer(this.curItem, src, offset, length); ByteBufferUtils.copyFromArrayToBuffer(this.curItem, src, offset, length);
return this; return this;
@ -803,6 +843,7 @@ public class MultiByteBuff extends ByteBuff {
*/ */
@Override @Override
public MultiByteBuff putLong(long val) { public MultiByteBuff putLong(long val) {
checkRefCount();
if (this.curItem.remaining() >= Bytes.SIZEOF_LONG) { if (this.curItem.remaining() >= Bytes.SIZEOF_LONG) {
this.curItem.putLong(val); this.curItem.putLong(val);
return this; return this;
@ -860,6 +901,7 @@ public class MultiByteBuff extends ByteBuff {
*/ */
@Override @Override
public MultiByteBuff skip(int length) { public MultiByteBuff skip(int length) {
checkRefCount();
// Get available bytes from this item and remaining from next // Get available bytes from this item and remaining from next
int jump = 0; int jump = 0;
while (true) { while (true) {
@ -882,6 +924,7 @@ public class MultiByteBuff extends ByteBuff {
*/ */
@Override @Override
public MultiByteBuff moveBack(int length) { public MultiByteBuff moveBack(int length) {
checkRefCount();
while (length != 0) { while (length != 0) {
if (length > curItem.position()) { if (length > curItem.position()) {
length -= curItem.position(); length -= curItem.position();
@ -909,6 +952,7 @@ public class MultiByteBuff extends ByteBuff {
*/ */
@Override @Override
public ByteBuffer asSubByteBuffer(int length) { public ByteBuffer asSubByteBuffer(int length) {
checkRefCount();
if (this.curItem.remaining() >= length) { if (this.curItem.remaining() >= length) {
return this.curItem; return this.curItem;
} }
@ -918,8 +962,8 @@ public class MultiByteBuff extends ByteBuff {
ByteBuffer locCurItem = curItem; ByteBuffer locCurItem = curItem;
while (length > 0) { while (length > 0) {
int toRead = Math.min(length, locCurItem.remaining()); int toRead = Math.min(length, locCurItem.remaining());
ByteBufferUtils ByteBufferUtils.copyFromBufferToArray(dupB, locCurItem, locCurItem.position(), offset,
.copyFromBufferToArray(dupB, locCurItem, locCurItem.position(), offset, toRead); toRead);
length -= toRead; length -= toRead;
if (length == 0) break; if (length == 0) break;
locCurItemIndex++; locCurItemIndex++;
@ -945,6 +989,7 @@ public class MultiByteBuff extends ByteBuff {
*/ */
@Override @Override
public void asSubByteBuffer(int offset, int length, ObjectIntPair<ByteBuffer> pair) { public void asSubByteBuffer(int offset, int length, ObjectIntPair<ByteBuffer> pair) {
checkRefCount();
if (this.itemBeginPos[this.curItemIndex] <= offset) { if (this.itemBeginPos[this.curItemIndex] <= offset) {
int relOffsetInCurItem = offset - this.itemBeginPos[this.curItemIndex]; int relOffsetInCurItem = offset - this.itemBeginPos[this.curItemIndex];
if (this.curItem.limit() - relOffsetInCurItem >= length) { if (this.curItem.limit() - relOffsetInCurItem >= length) {
@ -988,6 +1033,7 @@ public class MultiByteBuff extends ByteBuff {
@Override @Override
public void get(ByteBuffer out, int sourceOffset, public void get(ByteBuffer out, int sourceOffset,
int length) { int length) {
checkRefCount();
// Not used from real read path actually. So not going with // Not used from real read path actually. So not going with
// optimization // optimization
for (int i = 0; i < length; ++i) { for (int i = 0; i < length; ++i) {
@ -1007,6 +1053,7 @@ public class MultiByteBuff extends ByteBuff {
*/ */
@Override @Override
public byte[] toBytes(int offset, int length) { public byte[] toBytes(int offset, int length) {
checkRefCount();
byte[] output = new byte[length]; byte[] output = new byte[length];
this.get(offset, output, 0, length); this.get(offset, output, 0, length);
return output; return output;
@ -1014,6 +1061,7 @@ public class MultiByteBuff extends ByteBuff {
@Override @Override
public int read(ReadableByteChannel channel) throws IOException { public int read(ReadableByteChannel channel) throws IOException {
checkRefCount();
int total = 0; int total = 0;
while (true) { while (true) {
// Read max possible into the current BB // Read max possible into the current BB
@ -1033,6 +1081,12 @@ public class MultiByteBuff extends ByteBuff {
return total; return total;
} }
@Override
public ByteBuffer[] nioByteBuffers() {
checkRefCount();
return this.items;
}
@Override @Override
public boolean equals(Object obj) { public boolean equals(Object obj) {
if (!(obj instanceof MultiByteBuff)) return false; if (!(obj instanceof MultiByteBuff)) return false;
@ -1040,7 +1094,7 @@ public class MultiByteBuff extends ByteBuff {
MultiByteBuff that = (MultiByteBuff) obj; MultiByteBuff that = (MultiByteBuff) obj;
if (this.capacity() != that.capacity()) return false; if (this.capacity() != that.capacity()) return false;
if (ByteBuff.compareTo(this, this.position(), this.limit(), that, that.position(), if (ByteBuff.compareTo(this, this.position(), this.limit(), that, that.position(),
that.limit()) == 0) { that.limit()) == 0) {
return true; return true;
} }
return false; return false;
@ -1055,11 +1109,9 @@ public class MultiByteBuff extends ByteBuff {
return hash; return hash;
} }
/** @Override
* @return the ByteBuffers which this wraps. public MultiByteBuff retain() {
*/ refCnt.retain();
@VisibleForTesting return this;
public ByteBuffer[] getEnclosingByteBuffers() {
return this.items;
} }
} }

View File

@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.nio;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.io.netty.util.AbstractReferenceCounted;
import org.apache.hbase.thirdparty.io.netty.util.ReferenceCounted;
/**
* Maintain an reference count integer inside to track life cycle of {@link ByteBuff}, if the
* reference count become 0, it'll call {@link Recycler#free()} once.
*/
@InterfaceAudience.Private
class RefCnt extends AbstractReferenceCounted {
private Recycler recycler = ByteBuffAllocator.NONE;
RefCnt(Recycler recycler) {
this.recycler = recycler;
}
@Override
protected final void deallocate() {
this.recycler.free();
}
@Override
public final ReferenceCounted touch(Object hint) {
throw new UnsupportedOperationException();
}
}

View File

@ -17,22 +17,24 @@
*/ */
package org.apache.hadoop.hbase.nio; package org.apache.hadoop.hbase.nio;
import static org.apache.hadoop.hbase.io.ByteBuffAllocator.NONE;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel; import java.nio.channels.ReadableByteChannel;
import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.ObjectIntPair; import org.apache.hadoop.hbase.util.ObjectIntPair;
import org.apache.hadoop.hbase.util.UnsafeAccess; import org.apache.hadoop.hbase.util.UnsafeAccess;
import org.apache.hadoop.hbase.util.UnsafeAvailChecker; import org.apache.hadoop.hbase.util.UnsafeAvailChecker;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import sun.nio.ch.DirectBuffer; import sun.nio.ch.DirectBuffer;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
/** /**
* An implementation of ByteBuff where a single BB backs the BBI. This just acts * An implementation of ByteBuff where a single BB backs the BBI. This just acts as a wrapper over a
* as a wrapper over a normal BB - offheap or onheap * normal BB - offheap or onheap
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class SingleByteBuff extends ByteBuff { public class SingleByteBuff extends ByteBuff {
@ -48,6 +50,15 @@ public class SingleByteBuff extends ByteBuff {
private Object unsafeRef = null; private Object unsafeRef = null;
public SingleByteBuff(ByteBuffer buf) { public SingleByteBuff(ByteBuffer buf) {
this(NONE, buf);
}
public SingleByteBuff(Recycler recycler, ByteBuffer buf) {
this(new RefCnt(recycler), buf);
}
private SingleByteBuff(RefCnt refCnt, ByteBuffer buf) {
this.refCnt = refCnt;
this.buf = buf; this.buf = buf;
if (buf.hasArray()) { if (buf.hasArray()) {
this.unsafeOffset = UnsafeAccess.BYTE_ARRAY_BASE_OFFSET + buf.arrayOffset(); this.unsafeOffset = UnsafeAccess.BYTE_ARRAY_BASE_OFFSET + buf.arrayOffset();
@ -59,63 +70,74 @@ public class SingleByteBuff extends ByteBuff {
@Override @Override
public int position() { public int position() {
checkRefCount();
return this.buf.position(); return this.buf.position();
} }
@Override @Override
public SingleByteBuff position(int position) { public SingleByteBuff position(int position) {
checkRefCount();
this.buf.position(position); this.buf.position(position);
return this; return this;
} }
@Override @Override
public SingleByteBuff skip(int len) { public SingleByteBuff skip(int len) {
checkRefCount();
this.buf.position(this.buf.position() + len); this.buf.position(this.buf.position() + len);
return this; return this;
} }
@Override @Override
public SingleByteBuff moveBack(int len) { public SingleByteBuff moveBack(int len) {
checkRefCount();
this.buf.position(this.buf.position() - len); this.buf.position(this.buf.position() - len);
return this; return this;
} }
@Override @Override
public int capacity() { public int capacity() {
checkRefCount();
return this.buf.capacity(); return this.buf.capacity();
} }
@Override @Override
public int limit() { public int limit() {
checkRefCount();
return this.buf.limit(); return this.buf.limit();
} }
@Override @Override
public SingleByteBuff limit(int limit) { public SingleByteBuff limit(int limit) {
checkRefCount();
this.buf.limit(limit); this.buf.limit(limit);
return this; return this;
} }
@Override @Override
public SingleByteBuff rewind() { public SingleByteBuff rewind() {
checkRefCount();
this.buf.rewind(); this.buf.rewind();
return this; return this;
} }
@Override @Override
public SingleByteBuff mark() { public SingleByteBuff mark() {
checkRefCount();
this.buf.mark(); this.buf.mark();
return this; return this;
} }
@Override @Override
public ByteBuffer asSubByteBuffer(int length) { public ByteBuffer asSubByteBuffer(int length) {
checkRefCount();
// Just return the single BB that is available // Just return the single BB that is available
return this.buf; return this.buf;
} }
@Override @Override
public void asSubByteBuffer(int offset, int length, ObjectIntPair<ByteBuffer> pair) { public void asSubByteBuffer(int offset, int length, ObjectIntPair<ByteBuffer> pair) {
checkRefCount();
// Just return the single BB that is available // Just return the single BB that is available
pair.setFirst(this.buf); pair.setFirst(this.buf);
pair.setSecond(offset); pair.setSecond(offset);
@ -123,37 +145,44 @@ public class SingleByteBuff extends ByteBuff {
@Override @Override
public int remaining() { public int remaining() {
checkRefCount();
return this.buf.remaining(); return this.buf.remaining();
} }
@Override @Override
public boolean hasRemaining() { public boolean hasRemaining() {
checkRefCount();
return buf.hasRemaining(); return buf.hasRemaining();
} }
@Override @Override
public SingleByteBuff reset() { public SingleByteBuff reset() {
checkRefCount();
this.buf.reset(); this.buf.reset();
return this; return this;
} }
@Override @Override
public SingleByteBuff slice() { public SingleByteBuff slice() {
return new SingleByteBuff(this.buf.slice()); checkRefCount();
return new SingleByteBuff(this.refCnt, this.buf.slice());
} }
@Override @Override
public SingleByteBuff duplicate() { public SingleByteBuff duplicate() {
return new SingleByteBuff(this.buf.duplicate()); checkRefCount();
return new SingleByteBuff(this.refCnt, this.buf.duplicate());
} }
@Override @Override
public byte get() { public byte get() {
checkRefCount();
return buf.get(); return buf.get();
} }
@Override @Override
public byte get(int index) { public byte get(int index) {
checkRefCount();
if (UNSAFE_AVAIL) { if (UNSAFE_AVAIL) {
return UnsafeAccess.toByte(this.unsafeRef, this.unsafeOffset + index); return UnsafeAccess.toByte(this.unsafeRef, this.unsafeOffset + index);
} }
@ -162,29 +191,34 @@ public class SingleByteBuff extends ByteBuff {
@Override @Override
public byte getByteAfterPosition(int offset) { public byte getByteAfterPosition(int offset) {
checkRefCount();
return get(this.buf.position() + offset); return get(this.buf.position() + offset);
} }
@Override @Override
public SingleByteBuff put(byte b) { public SingleByteBuff put(byte b) {
checkRefCount();
this.buf.put(b); this.buf.put(b);
return this; return this;
} }
@Override @Override
public SingleByteBuff put(int index, byte b) { public SingleByteBuff put(int index, byte b) {
checkRefCount();
buf.put(index, b); buf.put(index, b);
return this; return this;
} }
@Override @Override
public void get(byte[] dst, int offset, int length) { public void get(byte[] dst, int offset, int length) {
checkRefCount();
ByteBufferUtils.copyFromBufferToArray(dst, buf, buf.position(), offset, length); ByteBufferUtils.copyFromBufferToArray(dst, buf, buf.position(), offset, length);
buf.position(buf.position() + length); buf.position(buf.position() + length);
} }
@Override @Override
public void get(int sourceOffset, byte[] dst, int offset, int length) { public void get(int sourceOffset, byte[] dst, int offset, int length) {
checkRefCount();
ByteBufferUtils.copyFromBufferToArray(dst, buf, sourceOffset, offset, length); ByteBufferUtils.copyFromBufferToArray(dst, buf, sourceOffset, offset, length);
} }
@ -195,9 +229,10 @@ public class SingleByteBuff extends ByteBuff {
@Override @Override
public SingleByteBuff put(int offset, ByteBuff src, int srcOffset, int length) { public SingleByteBuff put(int offset, ByteBuff src, int srcOffset, int length) {
checkRefCount();
if (src instanceof SingleByteBuff) { if (src instanceof SingleByteBuff) {
ByteBufferUtils.copyFromBufferToBuffer(((SingleByteBuff) src).buf, this.buf, srcOffset, ByteBufferUtils.copyFromBufferToBuffer(((SingleByteBuff) src).buf, this.buf, srcOffset,
offset, length); offset, length);
} else { } else {
// TODO we can do some optimization here? Call to asSubByteBuffer might // TODO we can do some optimization here? Call to asSubByteBuffer might
// create a copy. // create a copy.
@ -205,7 +240,7 @@ public class SingleByteBuff extends ByteBuff {
src.asSubByteBuffer(srcOffset, length, pair); src.asSubByteBuffer(srcOffset, length, pair);
if (pair.getFirst() != null) { if (pair.getFirst() != null) {
ByteBufferUtils.copyFromBufferToBuffer(pair.getFirst(), this.buf, pair.getSecond(), offset, ByteBufferUtils.copyFromBufferToBuffer(pair.getFirst(), this.buf, pair.getSecond(), offset,
length); length);
} }
} }
return this; return this;
@ -213,37 +248,44 @@ public class SingleByteBuff extends ByteBuff {
@Override @Override
public SingleByteBuff put(byte[] src, int offset, int length) { public SingleByteBuff put(byte[] src, int offset, int length) {
checkRefCount();
ByteBufferUtils.copyFromArrayToBuffer(this.buf, src, offset, length); ByteBufferUtils.copyFromArrayToBuffer(this.buf, src, offset, length);
return this; return this;
} }
@Override @Override
public SingleByteBuff put(byte[] src) { public SingleByteBuff put(byte[] src) {
checkRefCount();
return put(src, 0, src.length); return put(src, 0, src.length);
} }
@Override @Override
public boolean hasArray() { public boolean hasArray() {
checkRefCount();
return this.buf.hasArray(); return this.buf.hasArray();
} }
@Override @Override
public byte[] array() { public byte[] array() {
checkRefCount();
return this.buf.array(); return this.buf.array();
} }
@Override @Override
public int arrayOffset() { public int arrayOffset() {
checkRefCount();
return this.buf.arrayOffset(); return this.buf.arrayOffset();
} }
@Override @Override
public short getShort() { public short getShort() {
checkRefCount();
return this.buf.getShort(); return this.buf.getShort();
} }
@Override @Override
public short getShort(int index) { public short getShort(int index) {
checkRefCount();
if (UNSAFE_UNALIGNED) { if (UNSAFE_UNALIGNED) {
return UnsafeAccess.toShort(unsafeRef, unsafeOffset + index); return UnsafeAccess.toShort(unsafeRef, unsafeOffset + index);
} }
@ -252,22 +294,26 @@ public class SingleByteBuff extends ByteBuff {
@Override @Override
public short getShortAfterPosition(int offset) { public short getShortAfterPosition(int offset) {
checkRefCount();
return getShort(this.buf.position() + offset); return getShort(this.buf.position() + offset);
} }
@Override @Override
public int getInt() { public int getInt() {
checkRefCount();
return this.buf.getInt(); return this.buf.getInt();
} }
@Override @Override
public SingleByteBuff putInt(int value) { public SingleByteBuff putInt(int value) {
checkRefCount();
ByteBufferUtils.putInt(this.buf, value); ByteBufferUtils.putInt(this.buf, value);
return this; return this;
} }
@Override @Override
public int getInt(int index) { public int getInt(int index) {
checkRefCount();
if (UNSAFE_UNALIGNED) { if (UNSAFE_UNALIGNED) {
return UnsafeAccess.toInt(unsafeRef, unsafeOffset + index); return UnsafeAccess.toInt(unsafeRef, unsafeOffset + index);
} }
@ -276,22 +322,26 @@ public class SingleByteBuff extends ByteBuff {
@Override @Override
public int getIntAfterPosition(int offset) { public int getIntAfterPosition(int offset) {
checkRefCount();
return getInt(this.buf.position() + offset); return getInt(this.buf.position() + offset);
} }
@Override @Override
public long getLong() { public long getLong() {
checkRefCount();
return this.buf.getLong(); return this.buf.getLong();
} }
@Override @Override
public SingleByteBuff putLong(long value) { public SingleByteBuff putLong(long value) {
checkRefCount();
ByteBufferUtils.putLong(this.buf, value); ByteBufferUtils.putLong(this.buf, value);
return this; return this;
} }
@Override @Override
public long getLong(int index) { public long getLong(int index) {
checkRefCount();
if (UNSAFE_UNALIGNED) { if (UNSAFE_UNALIGNED) {
return UnsafeAccess.toLong(unsafeRef, unsafeOffset + index); return UnsafeAccess.toLong(unsafeRef, unsafeOffset + index);
} }
@ -300,11 +350,13 @@ public class SingleByteBuff extends ByteBuff {
@Override @Override
public long getLongAfterPosition(int offset) { public long getLongAfterPosition(int offset) {
checkRefCount();
return getLong(this.buf.position() + offset); return getLong(this.buf.position() + offset);
} }
@Override @Override
public byte[] toBytes(int offset, int length) { public byte[] toBytes(int offset, int length) {
checkRefCount();
byte[] output = new byte[length]; byte[] output = new byte[length];
ByteBufferUtils.copyFromBufferToArray(output, buf, offset, 0, length); ByteBufferUtils.copyFromBufferToArray(output, buf, offset, 0, length);
return output; return output;
@ -312,18 +364,28 @@ public class SingleByteBuff extends ByteBuff {
@Override @Override
public void get(ByteBuffer out, int sourceOffset, int length) { public void get(ByteBuffer out, int sourceOffset, int length) {
checkRefCount();
ByteBufferUtils.copyFromBufferToBuffer(buf, out, sourceOffset, length); ByteBufferUtils.copyFromBufferToBuffer(buf, out, sourceOffset, length);
} }
@Override @Override
public int read(ReadableByteChannel channel) throws IOException { public int read(ReadableByteChannel channel) throws IOException {
checkRefCount();
return channelRead(channel, buf); return channelRead(channel, buf);
} }
@Override
public ByteBuffer[] nioByteBuffers() {
checkRefCount();
return new ByteBuffer[] { this.buf };
}
@Override @Override
public boolean equals(Object obj) { public boolean equals(Object obj) {
if(!(obj instanceof SingleByteBuff)) return false; if (!(obj instanceof SingleByteBuff)) {
return this.buf.equals(((SingleByteBuff)obj).buf); return false;
}
return this.buf.equals(((SingleByteBuff) obj).buf);
} }
@Override @Override
@ -331,11 +393,9 @@ public class SingleByteBuff extends ByteBuff {
return this.buf.hashCode(); return this.buf.hashCode();
} }
/** @Override
* @return the ByteBuffer which this wraps. public SingleByteBuff retain() {
*/ refCnt.retain();
@VisibleForTesting return this;
public ByteBuffer getEnclosingByteBuffer() {
return this.buf;
} }
} }

View File

@ -27,9 +27,9 @@ import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.MultiByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -311,10 +311,6 @@ public class ByteBufferArray {
srcIndex += cnt; srcIndex += cnt;
} }
assert srcIndex == len; assert srcIndex == len;
if (mbb.length > 1) { return ByteBuffAllocator.wrap(mbb);
return new MultiByteBuff(mbb);
} else {
return new SingleByteBuff(mbb[0]);
}
} }
} }

View File

@ -30,6 +30,7 @@ import java.util.Arrays;
import org.apache.hadoop.hbase.io.ByteBufferWriter; import org.apache.hadoop.hbase.io.ByteBufferWriter;
import org.apache.hadoop.hbase.io.util.StreamUtils; import org.apache.hadoop.hbase.io.util.StreamUtils;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.WritableUtils;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@ -350,25 +351,39 @@ public final class ByteBufferUtils {
} }
} }
/** private interface ByteVisitor {
* Similar to {@link WritableUtils#readVLong(DataInput)} but reads from a byte get();
* {@link ByteBuffer}. }
*/
public static long readVLong(ByteBuffer in) { private static long readVLong(ByteVisitor visitor) {
byte firstByte = in.get(); byte firstByte = visitor.get();
int len = WritableUtils.decodeVIntSize(firstByte); int len = WritableUtils.decodeVIntSize(firstByte);
if (len == 1) { if (len == 1) {
return firstByte; return firstByte;
} }
long i = 0; long i = 0;
for (int idx = 0; idx < len-1; idx++) { for (int idx = 0; idx < len - 1; idx++) {
byte b = in.get(); byte b = visitor.get();
i = i << 8; i = i << 8;
i = i | (b & 0xFF); i = i | (b & 0xFF);
} }
return (WritableUtils.isNegativeVInt(firstByte) ? (i ^ -1L) : i); return (WritableUtils.isNegativeVInt(firstByte) ? (i ^ -1L) : i);
} }
/**
* Similar to {@link WritableUtils#readVLong(DataInput)} but reads from a {@link ByteBuffer}.
*/
public static long readVLong(ByteBuffer in) {
return readVLong(in::get);
}
/**
* Similar to {@link WritableUtils#readVLong(java.io.DataInput)} but reads from a
* {@link ByteBuff}.
*/
public static long readVLong(ByteBuff in) {
return readVLong(in::get);
}
/** /**
* Put in buffer integer using 7 bit encoding. For each written byte: * Put in buffer integer using 7 bit encoding. For each written byte:

View File

@ -0,0 +1,309 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.MultiByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.testclassification.RPCTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ RPCTests.class, SmallTests.class })
public class TestByteBuffAllocator {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestByteBuffAllocator.class);
@Test
public void testAllocateByteBuffToReadInto() {
int maxBuffersInPool = 10;
int bufSize = 6 * 1024;
ByteBuffAllocator alloc = new ByteBuffAllocator(true, maxBuffersInPool, bufSize, bufSize / 6);
ByteBuff buff = alloc.allocate(10 * bufSize);
buff.release();
// When the request size is less than 1/6th of the pool buffer size. We should use on demand
// created on heap Buffer
buff = alloc.allocate(200);
assertTrue(buff.hasArray());
assertEquals(maxBuffersInPool, alloc.getQueueSize());
buff.release();
// When the request size is > 1/6th of the pool buffer size.
buff = alloc.allocate(1024);
assertFalse(buff.hasArray());
assertEquals(maxBuffersInPool - 1, alloc.getQueueSize());
buff.release();// ByteBuffDeallocaor#free should put back the BB to pool.
assertEquals(maxBuffersInPool, alloc.getQueueSize());
// Request size> pool buffer size
buff = alloc.allocate(7 * 1024);
assertFalse(buff.hasArray());
assertTrue(buff instanceof MultiByteBuff);
ByteBuffer[] bbs = buff.nioByteBuffers();
assertEquals(2, bbs.length);
assertTrue(bbs[0].isDirect());
assertTrue(bbs[1].isDirect());
assertEquals(6 * 1024, bbs[0].limit());
assertEquals(1024, bbs[1].limit());
assertEquals(maxBuffersInPool - 2, alloc.getQueueSize());
buff.release();
assertEquals(maxBuffersInPool, alloc.getQueueSize());
buff = alloc.allocate(6 * 1024 + 200);
assertFalse(buff.hasArray());
assertTrue(buff instanceof MultiByteBuff);
bbs = buff.nioByteBuffers();
assertEquals(2, bbs.length);
assertTrue(bbs[0].isDirect());
assertFalse(bbs[1].isDirect());
assertEquals(6 * 1024, bbs[0].limit());
assertEquals(200, bbs[1].limit());
assertEquals(maxBuffersInPool - 1, alloc.getQueueSize());
buff.release();
assertEquals(maxBuffersInPool, alloc.getQueueSize());
alloc.allocate(bufSize * (maxBuffersInPool - 1));
buff = alloc.allocate(20 * 1024);
assertFalse(buff.hasArray());
assertTrue(buff instanceof MultiByteBuff);
bbs = buff.nioByteBuffers();
assertEquals(2, bbs.length);
assertTrue(bbs[0].isDirect());
assertFalse(bbs[1].isDirect());
assertEquals(6 * 1024, bbs[0].limit());
assertEquals(14 * 1024, bbs[1].limit());
assertEquals(0, alloc.getQueueSize());
buff.release();
assertEquals(1, alloc.getQueueSize());
alloc.allocateOneBuffer();
buff = alloc.allocate(7 * 1024);
assertTrue(buff.hasArray());
assertTrue(buff instanceof SingleByteBuff);
assertEquals(7 * 1024, buff.nioByteBuffers()[0].limit());
buff.release();
}
@Test
public void testNegativeAllocatedSize() {
int maxBuffersInPool = 10;
ByteBuffAllocator allocator =
new ByteBuffAllocator(true, maxBuffersInPool, 6 * 1024, 1024);
try {
allocator.allocate(-1);
fail("Should throw exception when size < 0");
} catch (IllegalArgumentException e) {
// expected exception
}
ByteBuff bb = allocator.allocate(0);
bb.release();
}
@Test
public void testAllocateOneBuffer() {
// Allocate from on-heap
ByteBuffAllocator allocator = ByteBuffAllocator.createOnHeap();
ByteBuff buf = allocator.allocateOneBuffer();
assertTrue(buf.hasArray());
assertEquals(ByteBuffAllocator.DEFAULT_BUFFER_SIZE, buf.remaining());
buf.release();
// Allocate from off-heap
int bufSize = 10;
allocator = new ByteBuffAllocator(true, 1, 10, 3);
buf = allocator.allocateOneBuffer();
assertFalse(buf.hasArray());
assertEquals(buf.remaining(), bufSize);
// The another one will be allocated from on-heap because the pool has only one ByteBuffer,
// and still not be cleaned.
ByteBuff buf2 = allocator.allocateOneBuffer();
assertTrue(buf2.hasArray());
assertEquals(buf2.remaining(), bufSize);
// free the first one
buf.release();
// The next one will be off-heap again.
buf = allocator.allocateOneBuffer();
assertFalse(buf.hasArray());
assertEquals(buf.remaining(), bufSize);
buf.release();
}
@Test
public void testReferenceCount() {
int bufSize = 64;
ByteBuffAllocator alloc = new ByteBuffAllocator(true, 2, bufSize, 3);
ByteBuff buf1 = alloc.allocate(bufSize * 2);
assertFalse(buf1.hasArray());
// The next one will be allocated from heap
ByteBuff buf2 = alloc.allocateOneBuffer();
assertTrue(buf2.hasArray());
// duplicate the buf2, if the dup released, buf2 will also be released (SingleByteBuffer)
ByteBuff dup2 = buf2.duplicate();
dup2.release();
assertEquals(0, buf2.refCnt());
assertEquals(0, dup2.refCnt());
assertEquals(0, alloc.getQueueSize());
assertException(dup2::position);
assertException(buf2::position);
// duplicate the buf1, if the dup1 released, buf1 will also be released (MultipleByteBuffer)
ByteBuff dup1 = buf1.duplicate();
dup1.release();
assertEquals(0, buf1.refCnt());
assertEquals(0, dup1.refCnt());
assertEquals(2, alloc.getQueueSize());
assertException(dup1::position);
assertException(buf1::position);
// slice the buf3, if the slice3 released, buf3 will also be released (SingleByteBuffer)
ByteBuff buf3 = alloc.allocateOneBuffer();
assertFalse(buf3.hasArray());
ByteBuff slice3 = buf3.slice();
slice3.release();
assertEquals(0, buf3.refCnt());
assertEquals(0, slice3.refCnt());
assertEquals(2, alloc.getQueueSize());
// slice the buf4, if the slice4 released, buf4 will also be released (MultipleByteBuffer)
ByteBuff buf4 = alloc.allocate(bufSize * 2);
assertFalse(buf4.hasArray());
ByteBuff slice4 = buf4.slice();
slice4.release();
assertEquals(0, buf4.refCnt());
assertEquals(0, slice4.refCnt());
assertEquals(2, alloc.getQueueSize());
// Test multiple reference for the same ByteBuff (SingleByteBuff)
ByteBuff buf5 = alloc.allocateOneBuffer();
ByteBuff slice5 = buf5.duplicate().duplicate().duplicate().slice().slice();
slice5.release();
assertEquals(0, buf5.refCnt());
assertEquals(0, slice5.refCnt());
assertEquals(2, alloc.getQueueSize());
assertException(slice5::position);
assertException(buf5::position);
// Test multiple reference for the same ByteBuff (SingleByteBuff)
ByteBuff buf6 = alloc.allocate(bufSize >> 2);
ByteBuff slice6 = buf6.duplicate().duplicate().duplicate().slice().slice();
slice6.release();
assertEquals(0, buf6.refCnt());
assertEquals(0, slice6.refCnt());
assertEquals(2, alloc.getQueueSize());
// Test retain the parent SingleByteBuff (duplicate)
ByteBuff parent = alloc.allocateOneBuffer();
ByteBuff child = parent.duplicate();
child.retain();
parent.release();
assertEquals(1, child.refCnt());
assertEquals(1, parent.refCnt());
assertEquals(1, alloc.getQueueSize());
parent.release();
assertEquals(0, child.refCnt());
assertEquals(0, parent.refCnt());
assertEquals(2, alloc.getQueueSize());
// Test retain parent MultiByteBuff (duplicate)
parent = alloc.allocate(bufSize << 1);
child = parent.duplicate();
child.retain();
parent.release();
assertEquals(1, child.refCnt());
assertEquals(1, parent.refCnt());
assertEquals(0, alloc.getQueueSize());
parent.release();
assertEquals(0, child.refCnt());
assertEquals(0, parent.refCnt());
assertEquals(2, alloc.getQueueSize());
// Test retain the parent SingleByteBuff (slice)
parent = alloc.allocateOneBuffer();
child = parent.slice();
child.retain();
parent.release();
assertEquals(1, child.refCnt());
assertEquals(1, parent.refCnt());
assertEquals(1, alloc.getQueueSize());
parent.release();
assertEquals(0, child.refCnt());
assertEquals(0, parent.refCnt());
assertEquals(2, alloc.getQueueSize());
// Test retain parent MultiByteBuff (slice)
parent = alloc.allocate(bufSize << 1);
child = parent.slice();
child.retain();
parent.release();
assertEquals(1, child.refCnt());
assertEquals(1, parent.refCnt());
assertEquals(0, alloc.getQueueSize());
parent.release();
assertEquals(0, child.refCnt());
assertEquals(0, parent.refCnt());
assertEquals(2, alloc.getQueueSize());
}
@Test
public void testReverseRef() {
int bufSize = 64;
ByteBuffAllocator alloc = new ByteBuffAllocator(true, 1, bufSize, 3);
ByteBuff buf1 = alloc.allocate(bufSize);
ByteBuff dup1 = buf1.duplicate();
assertEquals(1, buf1.refCnt());
assertEquals(1, dup1.refCnt());
buf1.release();
assertEquals(0, buf1.refCnt());
assertEquals(0, dup1.refCnt());
assertEquals(1, alloc.getQueueSize());
assertException(buf1::position);
assertException(dup1::position);
}
@Test
public void testByteBuffUnsupportedMethods() {
int bufSize = 64;
ByteBuffAllocator alloc = new ByteBuffAllocator(true, 1, bufSize, 3);
ByteBuff buf = alloc.allocate(bufSize);
assertException(() -> buf.retain(2));
assertException(() -> buf.release(2));
assertException(() -> buf.touch());
assertException(() -> buf.touch(new Object()));
}
private void assertException(Runnable r) {
try {
r.run();
fail();
} catch (Exception e) {
// expected exception.
}
}
}

View File

@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.List; import java.util.List;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.ByteBufferUtils;
@ -40,29 +41,30 @@ public class TestByteBufferListOutputStream {
@Test @Test
public void testWrites() throws Exception { public void testWrites() throws Exception {
ByteBufferPool pool = new ByteBufferPool(10, 3); ByteBuffAllocator alloc = new ByteBuffAllocator(true, 3, 10, 10 / 6);
ByteBufferListOutputStream bbos = new ByteBufferListOutputStream(pool); ByteBufferListOutputStream bbos = new ByteBufferListOutputStream(alloc);
bbos.write(2);// Write a byte bbos.write(2);// Write a byte
bbos.writeInt(100);// Write an int bbos.writeInt(100);// Write an int
byte[] b = Bytes.toBytes("row123");// 6 bytes byte[] b = Bytes.toBytes("row123");// 6 bytes
bbos.write(b); bbos.write(b);
assertEquals(2, bbos.allBufs.size());
// Just use the 3rd BB from pool so that pabos, on request, wont get one // Just use the 3rd BB from pool so that pabos, on request, wont get one
ByteBuffer bb1 = pool.getBuffer(); ByteBuff bb1 = alloc.allocateOneBuffer();
ByteBuffer bb = ByteBuffer.wrap(Bytes.toBytes("row123_cf1_q1"));// 13 bytes ByteBuffer bb = ByteBuffer.wrap(Bytes.toBytes("row123_cf1_q1"));// 13 bytes
bbos.write(bb, 0, bb.capacity()); bbos.write(bb, 0, bb.capacity());
pool.putbackBuffer(bb1); bb1.release();
bbos.writeInt(123); bbos.writeInt(123);
bbos.writeInt(124); bbos.writeInt(124);
assertEquals(0, pool.getQueueSize()); assertEquals(0, alloc.getQueueSize());
List<ByteBuffer> allBufs = bbos.getByteBuffers(); List<ByteBuffer> allBufs = bbos.getByteBuffers();
assertEquals(4, allBufs.size()); assertEquals(4, allBufs.size());
assertEquals(3, bbos.bufsFromPool.size()); assertEquals(4, bbos.allBufs.size());
ByteBuffer b1 = allBufs.get(0); ByteBuffer b1 = allBufs.get(0);
assertEquals(10, b1.remaining()); assertEquals(10, b1.remaining());
assertEquals(2, b1.get()); assertEquals(2, b1.get());
assertEquals(100, b1.getInt()); assertEquals(100, b1.getInt());
byte[] bActual = new byte[b.length]; byte[] bActual = new byte[b.length];
b1.get(bActual, 0, 5);//5 bytes in 1st BB b1.get(bActual, 0, 5);// 5 bytes in 1st BB
ByteBuffer b2 = allBufs.get(1); ByteBuffer b2 = allBufs.get(1);
assertEquals(10, b2.remaining()); assertEquals(10, b2.remaining());
b2.get(bActual, 5, 1);// Remaining 1 byte in 2nd BB b2.get(bActual, 5, 1);// Remaining 1 byte in 2nd BB
@ -78,6 +80,6 @@ public class TestByteBufferListOutputStream {
assertEquals(4, b4.remaining()); assertEquals(4, b4.remaining());
assertEquals(124, b4.getInt()); assertEquals(124, b4.getInt());
bbos.releaseResources(); bbos.releaseResources();
assertEquals(3, pool.getQueueSize()); assertEquals(3, alloc.getQueueSize());
} }
} }

View File

@ -1,67 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io;
import static org.junit.Assert.assertEquals;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ IOTests.class, SmallTests.class })
public class TestByteBufferPool {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestByteBufferPool.class);
@Test
public void testOffheapBBPool() throws Exception {
boolean directByteBuffer = true;
testBBPool(10, 100, directByteBuffer);
}
@Test
public void testOnheapBBPool() throws Exception {
boolean directByteBuffer = false;
testBBPool(10, 100, directByteBuffer);
}
private void testBBPool(int maxPoolSize, int bufferSize, boolean directByteBuffer) {
ByteBufferPool pool = new ByteBufferPool(bufferSize, maxPoolSize, directByteBuffer);
for (int i = 0; i < maxPoolSize; i++) {
ByteBuffer buffer = pool.getBuffer();
assertEquals(0, buffer.position());
assertEquals(bufferSize, buffer.limit());
assertEquals(directByteBuffer, buffer.isDirect());
}
assertEquals(0, pool.getQueueSize());
ByteBuffer bb = directByteBuffer ? ByteBuffer.allocate(bufferSize)
: ByteBuffer.allocateDirect(bufferSize);
pool.putbackBuffer(bb);
assertEquals(0, pool.getQueueSize());
bb = directByteBuffer ? ByteBuffer.allocateDirect(bufferSize + 1)
: ByteBuffer.allocate(bufferSize + 1);
pool.putbackBuffer(bb);
assertEquals(0, pool.getQueueSize());
}
}

View File

@ -286,12 +286,12 @@ public class TestMultiByteBuff {
multi.putInt(45); multi.putInt(45);
multi.position(1); multi.position(1);
multi.limit(multi.position() + (2 * Bytes.SIZEOF_LONG)); multi.limit(multi.position() + (2 * Bytes.SIZEOF_LONG));
MultiByteBuff sliced = multi.slice(); ByteBuff sliced = multi.slice();
assertEquals(0, sliced.position()); assertEquals(0, sliced.position());
assertEquals((2 * Bytes.SIZEOF_LONG), sliced.limit()); assertEquals((2 * Bytes.SIZEOF_LONG), sliced.limit());
assertEquals(l1, sliced.getLong()); assertEquals(l1, sliced.getLong());
assertEquals(l2, sliced.getLong()); assertEquals(l2, sliced.getLong());
MultiByteBuff dup = multi.duplicate(); ByteBuff dup = multi.duplicate();
assertEquals(1, dup.position()); assertEquals(1, dup.position());
assertEquals(dup.position() + (2 * Bytes.SIZEOF_LONG), dup.limit()); assertEquals(dup.position() + (2 * Bytes.SIZEOF_LONG), dup.limit());
assertEquals(l1, dup.getLong()); assertEquals(l1, dup.getLong());

View File

@ -69,11 +69,10 @@ public interface Cacheable extends HeapSize {
/** /**
* SHARED means when this Cacheable is read back from cache it refers to the same memory area as * SHARED means when this Cacheable is read back from cache it refers to the same memory area as
* used by the cache for caching it. * used by the cache for caching it. EXCLUSIVE means when this Cacheable is read back from cache,
* EXCLUSIVE means when this Cacheable is read back from cache, the data was copied to an * the data was copied to an exclusive memory area of this Cacheable.
* exclusive memory area of this Cacheable.
*/ */
public static enum MemoryType { enum MemoryType {
SHARED, EXCLUSIVE; SHARED, EXCLUSIVE
} }
} }

View File

@ -127,7 +127,7 @@ public class NettyRpcFrameDecoder extends ByteToMessageDecoder {
NettyServerCall reqTooBig = NettyServerCall reqTooBig =
new NettyServerCall(header.getCallId(), connection.service, null, null, null, null, new NettyServerCall(header.getCallId(), connection.service, null, null, null, null,
connection, 0, connection.addr, System.currentTimeMillis(), 0, connection, 0, connection.addr, System.currentTimeMillis(), 0,
connection.rpcServer.reservoir, connection.rpcServer.cellBlockBuilder, null); connection.rpcServer.bbAllocator, connection.rpcServer.cellBlockBuilder, null);
connection.rpcServer.metrics.exception(SimpleRpcServer.REQUEST_TOO_BIG_EXCEPTION); connection.rpcServer.metrics.exception(SimpleRpcServer.REQUEST_TOO_BIG_EXCEPTION);

View File

@ -187,7 +187,7 @@ public class NettyRpcServer extends RpcServer {
Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status, Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status,
long startTime, int timeout) throws IOException { long startTime, int timeout) throws IOException {
NettyServerCall fakeCall = new NettyServerCall(-1, service, md, null, param, cellScanner, null, NettyServerCall fakeCall = new NettyServerCall(-1, service, md, null, param, cellScanner, null,
-1, null, receiveTime, timeout, reservoir, cellBlockBuilder, null); -1, null, receiveTime, timeout, bbAllocator, cellBlockBuilder, null);
return call(fakeCall, status); return call(fakeCall, status);
} }
} }

View File

@ -21,9 +21,9 @@ import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellScanner;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.ByteBufferPool;
import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup; import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
import org.apache.hbase.thirdparty.com.google.protobuf.Message; import org.apache.hbase.thirdparty.com.google.protobuf.Message;
@ -39,10 +39,10 @@ class NettyServerCall extends ServerCall<NettyServerRpcConnection> {
NettyServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header, NettyServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header,
Message param, CellScanner cellScanner, NettyServerRpcConnection connection, long size, Message param, CellScanner cellScanner, NettyServerRpcConnection connection, long size,
InetAddress remoteAddress, long receiveTime, int timeout, InetAddress remoteAddress, long receiveTime, int timeout, ByteBuffAllocator bbAllocator,
ByteBufferPool reservoir, CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup) { CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup) {
super(id, service, md, header, param, cellScanner, connection, size, remoteAddress, super(id, service, md, header, param, cellScanner, connection, size, remoteAddress, receiveTime,
receiveTime, timeout, reservoir, cellBlockBuilder, reqCleanup); timeout, bbAllocator, cellBlockBuilder, reqCleanup);
} }
/** /**

View File

@ -59,12 +59,7 @@ class NettyServerRpcConnection extends ServerRpcConnection {
void process(final ByteBuf buf) throws IOException, InterruptedException { void process(final ByteBuf buf) throws IOException, InterruptedException {
if (connectionHeaderRead) { if (connectionHeaderRead) {
this.callCleanup = new RpcServer.CallCleanup() { this.callCleanup = buf::release;
@Override
public void run() {
buf.release();
}
};
process(new SingleByteBuff(buf.nioBuffer())); process(new SingleByteBuff(buf.nioBuffer()));
} else { } else {
ByteBuffer connectionHeader = ByteBuffer.allocate(buf.readableBytes()); ByteBuffer connectionHeader = ByteBuffer.allocate(buf.readableBytes());
@ -121,7 +116,7 @@ class NettyServerRpcConnection extends ServerRpcConnection {
long size, final InetAddress remoteAddress, int timeout, long size, final InetAddress remoteAddress, int timeout,
CallCleanup reqCleanup) { CallCleanup reqCleanup) {
return new NettyServerCall(id, service, md, header, param, cellScanner, this, size, return new NettyServerCall(id, service, md, header, param, cellScanner, this, size,
remoteAddress, System.currentTimeMillis(), timeout, this.rpcServer.reservoir, remoteAddress, System.currentTimeMillis(), timeout, this.rpcServer.bbAllocator,
this.rpcServer.cellBlockBuilder, reqCleanup); this.rpcServer.cellBlockBuilder, reqCleanup);
} }

View File

@ -26,7 +26,6 @@ import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel; import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel; import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -38,16 +37,12 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.exceptions.RequestTooBigException; import org.apache.hadoop.hbase.exceptions.RequestTooBigException;
import org.apache.hadoop.hbase.io.ByteBufferPool; import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.MultiByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.regionserver.RSRpcServices; import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.security.SaslUtil; import org.apache.hadoop.hbase.security.SaslUtil;
import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection; import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
@ -210,11 +205,7 @@ public abstract class RpcServer implements RpcServerInterface,
protected UserProvider userProvider; protected UserProvider userProvider;
protected final ByteBufferPool reservoir; protected final ByteBuffAllocator bbAllocator;
// The requests and response will use buffers from ByteBufferPool, when the size of the
// request/response is at least this size.
// We make this to be 1/6th of the pool buffer size.
protected final int minSizeForReservoirUse;
protected volatile boolean allowFallbackToSimpleAuth; protected volatile boolean allowFallbackToSimpleAuth;
@ -225,7 +216,7 @@ public abstract class RpcServer implements RpcServerInterface,
private RSRpcServices rsRpcServices; private RSRpcServices rsRpcServices;
@FunctionalInterface @FunctionalInterface
protected static interface CallCleanup { protected interface CallCleanup {
void run(); void run();
} }
@ -266,32 +257,7 @@ public abstract class RpcServer implements RpcServerInterface,
final List<BlockingServiceAndInterface> services, final List<BlockingServiceAndInterface> services,
final InetSocketAddress bindAddress, Configuration conf, final InetSocketAddress bindAddress, Configuration conf,
RpcScheduler scheduler, boolean reservoirEnabled) throws IOException { RpcScheduler scheduler, boolean reservoirEnabled) throws IOException {
if (reservoirEnabled) { this.bbAllocator = ByteBuffAllocator.create(conf, reservoirEnabled);
int poolBufSize = conf.getInt(ByteBufferPool.BUFFER_SIZE_KEY,
ByteBufferPool.DEFAULT_BUFFER_SIZE);
// The max number of buffers to be pooled in the ByteBufferPool. The default value been
// selected based on the #handlers configured. When it is read request, 2 MB is the max size
// at which we will send back one RPC request. Means max we need 2 MB for creating the
// response cell block. (Well it might be much lesser than this because in 2 MB size calc, we
// include the heap size overhead of each cells also.) Considering 2 MB, we will need
// (2 * 1024 * 1024) / poolBufSize buffers to make the response cell block. Pool buffer size
// is by default 64 KB.
// In case of read request, at the end of the handler process, we will make the response
// cellblock and add the Call to connection's response Q and a single Responder thread takes
// connections and responses from that one by one and do the socket write. So there is chances
// that by the time a handler originated response is actually done writing to socket and so
// released the BBs it used, the handler might have processed one more read req. On an avg 2x
// we consider and consider that also for the max buffers to pool
int bufsForTwoMB = (2 * 1024 * 1024) / poolBufSize;
int maxPoolSize = conf.getInt(ByteBufferPool.MAX_POOL_SIZE_KEY,
conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * bufsForTwoMB * 2);
this.reservoir = new ByteBufferPool(poolBufSize, maxPoolSize);
this.minSizeForReservoirUse = getMinSizeForReservoirUse(this.reservoir);
} else {
reservoir = null;
this.minSizeForReservoirUse = Integer.MAX_VALUE;// reservoir itself not in place.
}
this.server = server; this.server = server;
this.services = services; this.services = services;
this.bindAddress = bindAddress; this.bindAddress = bindAddress;
@ -325,11 +291,6 @@ public abstract class RpcServer implements RpcServerInterface,
this.scheduler = scheduler; this.scheduler = scheduler;
} }
@VisibleForTesting
static int getMinSizeForReservoirUse(ByteBufferPool pool) {
return pool.getBufferSize() / 6;
}
@Override @Override
public void onConfigurationChange(Configuration newConf) { public void onConfigurationChange(Configuration newConf) {
initReconfigurable(newConf); initReconfigurable(newConf);
@ -673,55 +634,6 @@ public abstract class RpcServer implements RpcServerInterface,
return (nBytes > 0) ? nBytes : ret; return (nBytes > 0) ? nBytes : ret;
} }
/**
* This is extracted to a static method for better unit testing. We try to get buffer(s) from pool
* as much as possible.
*
* @param pool The ByteBufferPool to use
* @param minSizeForPoolUse Only for buffer size above this, we will try to use pool. Any buffer
* need of size below this, create on heap ByteBuffer.
* @param reqLen Bytes count in request
*/
@VisibleForTesting
static Pair<ByteBuff, CallCleanup> allocateByteBuffToReadInto(ByteBufferPool pool,
int minSizeForPoolUse, int reqLen) {
ByteBuff resultBuf;
List<ByteBuffer> bbs = new ArrayList<>((reqLen / pool.getBufferSize()) + 1);
int remain = reqLen;
ByteBuffer buf = null;
while (remain >= minSizeForPoolUse && (buf = pool.getBuffer()) != null) {
bbs.add(buf);
remain -= pool.getBufferSize();
}
ByteBuffer[] bufsFromPool = null;
if (bbs.size() > 0) {
bufsFromPool = new ByteBuffer[bbs.size()];
bbs.toArray(bufsFromPool);
}
if (remain > 0) {
bbs.add(ByteBuffer.allocate(remain));
}
if (bbs.size() > 1) {
ByteBuffer[] items = new ByteBuffer[bbs.size()];
bbs.toArray(items);
resultBuf = new MultiByteBuff(items);
} else {
// We are backed by single BB
resultBuf = new SingleByteBuff(bbs.get(0));
}
resultBuf.limit(reqLen);
if (bufsFromPool != null) {
final ByteBuffer[] bufsFromPoolFinal = bufsFromPool;
return new Pair<>(resultBuf, () -> {
// Return back all the BBs to pool
for (int i = 0; i < bufsFromPoolFinal.length; i++) {
pool.putbackBuffer(bufsFromPoolFinal[i]);
}
});
}
return new Pair<>(resultBuf, null);
}
/** /**
* Needed for features such as delayed calls. We need to be able to store the current call * Needed for features such as delayed calls. We need to be able to store the current call
* so that we can complete it later or ask questions of what is supported by the current ongoing * so that we can complete it later or ask questions of what is supported by the current ongoing

View File

@ -26,10 +26,10 @@ import java.util.Optional;
import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.io.ByteBufferListOutputStream; import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
import org.apache.hadoop.hbase.io.ByteBufferPool;
import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup; import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
@ -67,7 +67,7 @@ abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, Rpc
protected long startTime; protected long startTime;
protected final long deadline;// the deadline to handle this call, if exceed we can drop it. protected final long deadline;// the deadline to handle this call, if exceed we can drop it.
protected final ByteBufferPool reservoir; protected final ByteBuffAllocator bbAllocator;
protected final CellBlockBuilder cellBlockBuilder; protected final CellBlockBuilder cellBlockBuilder;
@ -91,11 +91,11 @@ abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, Rpc
private long exceptionSize = 0; private long exceptionSize = 0;
private final boolean retryImmediatelySupported; private final boolean retryImmediatelySupported;
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH", @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH",
justification="Can't figure why this complaint is happening... see below") justification = "Can't figure why this complaint is happening... see below")
ServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header, ServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header,
Message param, CellScanner cellScanner, T connection, long size, Message param, CellScanner cellScanner, T connection, long size, InetAddress remoteAddress,
InetAddress remoteAddress, long receiveTime, int timeout, ByteBufferPool reservoir, long receiveTime, int timeout, ByteBuffAllocator byteBuffAllocator,
CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup) { CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup) {
this.id = id; this.id = id;
this.service = service; this.service = service;
@ -118,7 +118,7 @@ abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, Rpc
this.remoteAddress = remoteAddress; this.remoteAddress = remoteAddress;
this.timeout = timeout; this.timeout = timeout;
this.deadline = this.timeout > 0 ? this.receiveTime + this.timeout : Long.MAX_VALUE; this.deadline = this.timeout > 0 ? this.receiveTime + this.timeout : Long.MAX_VALUE;
this.reservoir = reservoir; this.bbAllocator = byteBuffAllocator;
this.cellBlockBuilder = cellBlockBuilder; this.cellBlockBuilder = cellBlockBuilder;
this.reqCleanup = reqCleanup; this.reqCleanup = reqCleanup;
} }
@ -199,9 +199,9 @@ abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, Rpc
// high when we can avoid a big buffer allocation on each rpc. // high when we can avoid a big buffer allocation on each rpc.
List<ByteBuffer> cellBlock = null; List<ByteBuffer> cellBlock = null;
int cellBlockSize = 0; int cellBlockSize = 0;
if (this.reservoir != null) { if (bbAllocator.isReservoirEnabled()) {
this.cellBlockStream = this.cellBlockBuilder.buildCellBlockStream(this.connection.codec, this.cellBlockStream = this.cellBlockBuilder.buildCellBlockStream(this.connection.codec,
this.connection.compressionCodec, cells, this.reservoir); this.connection.compressionCodec, cells, bbAllocator);
if (this.cellBlockStream != null) { if (this.cellBlockStream != null) {
cellBlock = this.cellBlockStream.getByteBuffers(); cellBlock = this.cellBlockStream.getByteBuffers();
cellBlockSize = this.cellBlockStream.size(); cellBlockSize = this.cellBlockStream.size();

View File

@ -488,7 +488,7 @@ public class SimpleRpcServer extends RpcServer {
Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status, Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status,
long startTime, int timeout) throws IOException { long startTime, int timeout) throws IOException {
SimpleServerCall fakeCall = new SimpleServerCall(-1, service, md, null, param, cellScanner, SimpleServerCall fakeCall = new SimpleServerCall(-1, service, md, null, param, cellScanner,
null, -1, null, receiveTime, timeout, reservoir, cellBlockBuilder, null, null); null, -1, null, receiveTime, timeout, bbAllocator, cellBlockBuilder, null, null);
return call(fakeCall, status); return call(fakeCall, status);
} }

View File

@ -21,9 +21,9 @@ import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellScanner;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.ByteBufferPool;
import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup; import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
import org.apache.hbase.thirdparty.com.google.protobuf.Message; import org.apache.hbase.thirdparty.com.google.protobuf.Message;
@ -42,11 +42,12 @@ class SimpleServerCall extends ServerCall<SimpleServerRpcConnection> {
justification = "Can't figure why this complaint is happening... see below") justification = "Can't figure why this complaint is happening... see below")
SimpleServerCall(int id, final BlockingService service, final MethodDescriptor md, SimpleServerCall(int id, final BlockingService service, final MethodDescriptor md,
RequestHeader header, Message param, CellScanner cellScanner, RequestHeader header, Message param, CellScanner cellScanner,
SimpleServerRpcConnection connection, long size, SimpleServerRpcConnection connection, long size, final InetAddress remoteAddress,
final InetAddress remoteAddress, long receiveTime, int timeout, ByteBufferPool reservoir, long receiveTime, int timeout, ByteBuffAllocator bbAllocator,
CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup, SimpleRpcServerResponder responder) { CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup,
super(id, service, md, header, param, cellScanner, connection, size, remoteAddress, SimpleRpcServerResponder responder) {
receiveTime, timeout, reservoir, cellBlockBuilder, reqCleanup); super(id, service, md, header, param, cellScanner, connection, size, remoteAddress, receiveTime,
timeout, bbAllocator, cellBlockBuilder, reqCleanup);
this.responder = responder; this.responder = responder;
} }

View File

@ -36,14 +36,12 @@ import org.apache.hadoop.hbase.client.VersionInfoUtil;
import org.apache.hadoop.hbase.exceptions.RequestTooBigException; import org.apache.hadoop.hbase.exceptions.RequestTooBigException;
import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup; import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream; import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
import org.apache.hbase.thirdparty.com.google.protobuf.Message; import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.util.Pair;
/** Reads calls from a connection and queues them for handling. */ /** Reads calls from a connection and queues them for handling. */
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "VO_VOLATILE_INCREMENT", @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "VO_VOLATILE_INCREMENT",
@ -212,7 +210,7 @@ class SimpleServerRpcConnection extends ServerRpcConnection {
// Notify the client about the offending request // Notify the client about the offending request
SimpleServerCall reqTooBig = new SimpleServerCall(header.getCallId(), this.service, null, SimpleServerCall reqTooBig = new SimpleServerCall(header.getCallId(), this.service, null,
null, null, null, this, 0, this.addr, System.currentTimeMillis(), 0, null, null, null, this, 0, this.addr, System.currentTimeMillis(), 0,
this.rpcServer.reservoir, this.rpcServer.cellBlockBuilder, null, responder); this.rpcServer.bbAllocator, this.rpcServer.cellBlockBuilder, null, responder);
this.rpcServer.metrics.exception(SimpleRpcServer.REQUEST_TOO_BIG_EXCEPTION); this.rpcServer.metrics.exception(SimpleRpcServer.REQUEST_TOO_BIG_EXCEPTION);
// Make sure the client recognizes the underlying exception // Make sure the client recognizes the underlying exception
// Otherwise, throw a DoNotRetryIOException. // Otherwise, throw a DoNotRetryIOException.
@ -255,24 +253,8 @@ class SimpleServerRpcConnection extends ServerRpcConnection {
// It creates the ByteBuff and CallCleanup and assign to Connection instance. // It creates the ByteBuff and CallCleanup and assign to Connection instance.
private void initByteBuffToReadInto(int length) { private void initByteBuffToReadInto(int length) {
// We create random on heap buffers are read into those when this.data = rpcServer.bbAllocator.allocate(length);
// 1. ByteBufferPool is not there. this.callCleanup = data::release;
// 2. When the size of the req is very small. Using a large sized (64 KB) buffer from pool is
// waste then. Also if all the reqs are of this size, we will be creating larger sized
// buffers and pool them permanently. This include Scan/Get request and DDL kind of reqs like
// RegionOpen.
// 3. If it is an initial handshake signal or initial connection request. Any way then
// condition 2 itself will match
// 4. When SASL use is ON.
if (this.rpcServer.reservoir == null || skipInitialSaslHandshake || !connectionHeaderRead ||
useSasl || length < this.rpcServer.minSizeForReservoirUse) {
this.data = new SingleByteBuff(ByteBuffer.allocate(length));
} else {
Pair<ByteBuff, CallCleanup> pair = RpcServer.allocateByteBuffToReadInto(
this.rpcServer.reservoir, this.rpcServer.minSizeForReservoirUse, length);
this.data = pair.getFirst();
this.callCleanup = pair.getSecond();
}
} }
protected int channelDataRead(ReadableByteChannel channel, ByteBuff buf) throws IOException { protected int channelDataRead(ReadableByteChannel channel, ByteBuff buf) throws IOException {
@ -345,7 +327,7 @@ class SimpleServerRpcConnection extends ServerRpcConnection {
RequestHeader header, Message param, CellScanner cellScanner, long size, RequestHeader header, Message param, CellScanner cellScanner, long size,
InetAddress remoteAddress, int timeout, CallCleanup reqCleanup) { InetAddress remoteAddress, int timeout, CallCleanup reqCleanup) {
return new SimpleServerCall(id, service, md, header, param, cellScanner, this, size, return new SimpleServerCall(id, service, md, header, param, cellScanner, this, size,
remoteAddress, System.currentTimeMillis(), timeout, this.rpcServer.reservoir, remoteAddress, System.currentTimeMillis(), timeout, this.rpcServer.bbAllocator,
this.rpcServer.cellBlockBuilder, reqCleanup, this.responder); this.rpcServer.cellBlockBuilder, reqCleanup, this.responder);
} }

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT; import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT;
import static org.apache.hadoop.hbase.io.ByteBuffAllocator.MAX_BUFFER_COUNT_KEY;
import static org.apache.hadoop.hbase.master.LoadBalancer.TABLES_ON_MASTER; import static org.apache.hadoop.hbase.master.LoadBalancer.TABLES_ON_MASTER;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -42,7 +43,6 @@ import org.apache.hadoop.hbase.MemoryCompactionPolicy;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.io.ByteBufferPool;
import org.apache.hadoop.hbase.regionserver.CompactingMemStore; import org.apache.hadoop.hbase.regionserver.CompactingMemStore;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.ClientTests;
@ -94,7 +94,7 @@ public class TestAsyncTableGetMultiThreaded {
protected static void setUp(MemoryCompactionPolicy memoryCompaction) throws Exception { protected static void setUp(MemoryCompactionPolicy memoryCompaction) throws Exception {
TEST_UTIL.getConfiguration().set(TABLES_ON_MASTER, "none"); TEST_UTIL.getConfiguration().set(TABLES_ON_MASTER, "none");
TEST_UTIL.getConfiguration().setLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, 60000L); TEST_UTIL.getConfiguration().setLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, 60000L);
TEST_UTIL.getConfiguration().setInt(ByteBufferPool.MAX_POOL_SIZE_KEY, 100); TEST_UTIL.getConfiguration().setInt(MAX_BUFFER_COUNT_KEY, 100);
TEST_UTIL.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, TEST_UTIL.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
String.valueOf(memoryCompaction)); String.valueOf(memoryCompaction));

View File

@ -17,6 +17,8 @@
*/ */
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.io.ByteBuffAllocator.BUFFER_SIZE_KEY;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
@ -26,7 +28,6 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.io.ByteBufferPool;
import org.apache.hadoop.hbase.ipc.NettyRpcServer; import org.apache.hadoop.hbase.ipc.NettyRpcServer;
import org.apache.hadoop.hbase.ipc.RpcServerFactory; import org.apache.hadoop.hbase.ipc.RpcServerFactory;
import org.apache.hadoop.hbase.ipc.SimpleRpcServer; import org.apache.hadoop.hbase.ipc.SimpleRpcServer;
@ -71,9 +72,8 @@ public class TestServerLoadDurability {
private static Configuration createConfigurationForSimpleRpcServer() { private static Configuration createConfigurationForSimpleRpcServer() {
Configuration conf = HBaseConfiguration.create(); Configuration conf = HBaseConfiguration.create();
conf.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, conf.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, SimpleRpcServer.class.getName());
SimpleRpcServer.class.getName()); conf.setInt(BUFFER_SIZE_KEY, 20);
conf.setInt(ByteBufferPool.BUFFER_SIZE_KEY, 20);
return conf; return conf;
} }

View File

@ -832,7 +832,7 @@ public class TestHFileBlock {
if (ClassSize.is32BitJVM()) { if (ClassSize.is32BitJVM()) {
assertEquals(64, HFileBlock.MULTI_BYTE_BUFFER_HEAP_SIZE); assertEquals(64, HFileBlock.MULTI_BYTE_BUFFER_HEAP_SIZE);
} else { } else {
assertEquals(72, HFileBlock.MULTI_BYTE_BUFFER_HEAP_SIZE); assertEquals(80, HFileBlock.MULTI_BYTE_BUFFER_HEAP_SIZE);
} }
for (int size : new int[] { 100, 256, 12345 }) { for (int size : new int[] { 100, 256, 12345 }) {

View File

@ -1,144 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.io.ByteBufferPool;
import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.MultiByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.testclassification.RPCTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ RPCTests.class, SmallTests.class })
public class TestRpcServer {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRpcServer.class);
@Test
public void testAllocateByteBuffToReadInto() throws Exception {
int maxBuffersInPool = 10;
ByteBufferPool pool = new ByteBufferPool(6 * 1024, maxBuffersInPool);
initPoolWithAllBuffers(pool, maxBuffersInPool);
ByteBuff buff = null;
Pair<ByteBuff, CallCleanup> pair;
// When the request size is less than 1/6th of the pool buffer size. We should use on demand
// created on heap Buffer
pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool),
200);
buff = pair.getFirst();
assertTrue(buff.hasArray());
assertEquals(maxBuffersInPool, pool.getQueueSize());
assertNull(pair.getSecond());
// When the request size is > 1/6th of the pool buffer size.
pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool),
1024);
buff = pair.getFirst();
assertFalse(buff.hasArray());
assertEquals(maxBuffersInPool - 1, pool.getQueueSize());
assertNotNull(pair.getSecond());
pair.getSecond().run();// CallCleanup#run should put back the BB to pool.
assertEquals(maxBuffersInPool, pool.getQueueSize());
// Request size> pool buffer size
pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool),
7 * 1024);
buff = pair.getFirst();
assertFalse(buff.hasArray());
assertTrue(buff instanceof MultiByteBuff);
ByteBuffer[] bbs = ((MultiByteBuff) buff).getEnclosingByteBuffers();
assertEquals(2, bbs.length);
assertTrue(bbs[0].isDirect());
assertTrue(bbs[1].isDirect());
assertEquals(6 * 1024, bbs[0].limit());
assertEquals(1024, bbs[1].limit());
assertEquals(maxBuffersInPool - 2, pool.getQueueSize());
assertNotNull(pair.getSecond());
pair.getSecond().run();// CallCleanup#run should put back the BB to pool.
assertEquals(maxBuffersInPool, pool.getQueueSize());
pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool),
6 * 1024 + 200);
buff = pair.getFirst();
assertFalse(buff.hasArray());
assertTrue(buff instanceof MultiByteBuff);
bbs = ((MultiByteBuff) buff).getEnclosingByteBuffers();
assertEquals(2, bbs.length);
assertTrue(bbs[0].isDirect());
assertFalse(bbs[1].isDirect());
assertEquals(6 * 1024, bbs[0].limit());
assertEquals(200, bbs[1].limit());
assertEquals(maxBuffersInPool - 1, pool.getQueueSize());
assertNotNull(pair.getSecond());
pair.getSecond().run();// CallCleanup#run should put back the BB to pool.
assertEquals(maxBuffersInPool, pool.getQueueSize());
ByteBuffer[] buffers = new ByteBuffer[maxBuffersInPool - 1];
for (int i = 0; i < maxBuffersInPool - 1; i++) {
buffers[i] = pool.getBuffer();
}
pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool),
20 * 1024);
buff = pair.getFirst();
assertFalse(buff.hasArray());
assertTrue(buff instanceof MultiByteBuff);
bbs = ((MultiByteBuff) buff).getEnclosingByteBuffers();
assertEquals(2, bbs.length);
assertTrue(bbs[0].isDirect());
assertFalse(bbs[1].isDirect());
assertEquals(6 * 1024, bbs[0].limit());
assertEquals(14 * 1024, bbs[1].limit());
assertEquals(0, pool.getQueueSize());
assertNotNull(pair.getSecond());
pair.getSecond().run();// CallCleanup#run should put back the BB to pool.
assertEquals(1, pool.getQueueSize());
pool.getBuffer();
pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool),
7 * 1024);
buff = pair.getFirst();
assertTrue(buff.hasArray());
assertTrue(buff instanceof SingleByteBuff);
assertEquals(7 * 1024, ((SingleByteBuff) buff).getEnclosingByteBuffer().limit());
assertNull(pair.getSecond());
}
private void initPoolWithAllBuffers(ByteBufferPool pool, int maxBuffersInPool) {
ByteBuffer[] buffers = new ByteBuffer[maxBuffersInPool];
// Just call getBuffer() on pool 'maxBuffersInPool' so as to init all buffers and then put back
// all. Makes pool with max #buffers.
for (int i = 0; i < maxBuffersInPool; i++) {
buffers[i] = pool.getBuffer();
}
for (ByteBuffer buf : buffers) {
pool.putbackBuffer(buf);
}
}
}