HBASE-21916 Abstract an ByteBuffAllocator to allocate/free ByteBuffer in ByteBufferPool

This commit is contained in:
huzheng 2019-02-16 17:16:09 +08:00
parent 9aee88e03a
commit 532ebfb7a5
34 changed files with 974 additions and 697 deletions

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -41,7 +42,6 @@ import org.apache.hadoop.hbase.io.ByteBuffInputStream;
import org.apache.hadoop.hbase.io.ByteBufferInputStream;
import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
import org.apache.hadoop.hbase.io.ByteBufferPool;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.util.ClassSize;
@ -208,7 +208,7 @@ class CellBlockBuilder {
* @param codec to use for encoding
* @param compressor to use for encoding
* @param cellScanner to encode
* @param pool Pool of ByteBuffers to make use of.
* @param allocator to allocate the {@link ByteBuff}.
* @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using
* passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has
* been flipped and is ready for reading. Use limit to find total size. If
@ -217,15 +217,14 @@ class CellBlockBuilder {
* @throws IOException if encoding the cells fail
*/
public ByteBufferListOutputStream buildCellBlockStream(Codec codec, CompressionCodec compressor,
CellScanner cellScanner, ByteBufferPool pool) throws IOException {
CellScanner cellScanner, ByteBuffAllocator allocator) throws IOException {
if (cellScanner == null) {
return null;
}
if (codec == null) {
throw new CellScannerButNoCodecException();
}
assert pool != null;
ByteBufferListOutputStream bbos = new ByteBufferListOutputStream(pool);
ByteBufferListOutputStream bbos = new ByteBufferListOutputStream(allocator);
encodeCellsTo(bbos, cellScanner, codec, compressor);
if (bbos.size() == 0) {
bbos.releaseResources();

View File

@ -150,6 +150,10 @@
<groupId>org.apache.hbase.thirdparty</groupId>
<artifactId>hbase-shaded-miscellaneous</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase.thirdparty</groupId>
<artifactId>hbase-shaded-netty</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>

View File

@ -0,0 +1,282 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.MultiByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
/**
* ByteBuffAllocator is used for allocating/freeing the ByteBuffers from/to NIO ByteBuffer pool, and
* it provide high-level interfaces for upstream. when allocating desired memory size, it will
* return {@link ByteBuff}, if we are sure that those ByteBuffers have reached the end of life
* cycle, we must do the {@link ByteBuff#release()} to return back the buffers to the pool,
* otherwise ByteBuffers leak will happen, and the NIO ByteBuffer pool may be exhausted. there's
* possible that the desired memory size is large than ByteBufferPool has, we'll downgrade to
* allocate ByteBuffers from heap which meaning the GC pressure may increase again. Of course, an
* better way is increasing the ByteBufferPool size if we detected this case. <br/>
* <br/>
* On the other hand, for better memory utilization, we have set an lower bound named
* minSizeForReservoirUse in this allocator, and if the desired size is less than
* minSizeForReservoirUse, the allocator will just allocate the ByteBuffer from heap and let the JVM
* free its memory, because it's too wasting to allocate a single fixed-size ByteBuffer for some
* small objects. <br/>
* <br/>
* We recommend to use this class to allocate/free {@link ByteBuff} in the RPC layer or the entire
* read/write path, because it hide the details of memory management and its APIs are more friendly
* to the upper layer.
*/
@InterfaceAudience.Private
public class ByteBuffAllocator {
private static final Logger LOG = LoggerFactory.getLogger(ByteBuffAllocator.class);
public static final String MAX_BUFFER_COUNT_KEY = "hbase.ipc.server.allocator.max.buffer.count";
public static final String BUFFER_SIZE_KEY = "hbase.ipc.server.allocator.buffer.size";
// 64 KB. Making it same as the chunk size what we will write/read to/from the socket channel.
public static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
public static final String MIN_ALLOCATE_SIZE_KEY =
"hbase.ipc.server.reservoir.minimal.allocating.size";
public static final Recycler NONE = () -> {
};
public interface Recycler {
void free();
}
private final boolean reservoirEnabled;
private final int bufSize;
private final int maxBufCount;
private final AtomicInteger usedBufCount = new AtomicInteger(0);
private boolean maxPoolSizeInfoLevelLogged = false;
// If the desired size is at least this size, it'll allocated from ByteBufferPool, otherwise it'll
// allocated from heap for better utilization. We make this to be 1/6th of the pool buffer size.
private final int minSizeForReservoirUse;
private final Queue<ByteBuffer> buffers = new ConcurrentLinkedQueue<>();
/**
* Initialize an {@link ByteBuffAllocator} which will try to allocate ByteBuffers from off-heap if
* reservoir is enabled and the reservoir has enough buffers, otherwise the allocator will just
* allocate the insufficient buffers from on-heap to meet the requirement.
* @param conf which get the arguments to initialize the allocator.
* @param reservoirEnabled indicate whether the reservoir is enabled or disabled.
* @return ByteBuffAllocator to manage the byte buffers.
*/
public static ByteBuffAllocator create(Configuration conf, boolean reservoirEnabled) {
int poolBufSize = conf.getInt(BUFFER_SIZE_KEY, DEFAULT_BUFFER_SIZE);
if (reservoirEnabled) {
// The max number of buffers to be pooled in the ByteBufferPool. The default value been
// selected based on the #handlers configured. When it is read request, 2 MB is the max size
// at which we will send back one RPC request. Means max we need 2 MB for creating the
// response cell block. (Well it might be much lesser than this because in 2 MB size calc, we
// include the heap size overhead of each cells also.) Considering 2 MB, we will need
// (2 * 1024 * 1024) / poolBufSize buffers to make the response cell block. Pool buffer size
// is by default 64 KB.
// In case of read request, at the end of the handler process, we will make the response
// cellblock and add the Call to connection's response Q and a single Responder thread takes
// connections and responses from that one by one and do the socket write. So there is chances
// that by the time a handler originated response is actually done writing to socket and so
// released the BBs it used, the handler might have processed one more read req. On an avg 2x
// we consider and consider that also for the max buffers to pool
int bufsForTwoMB = (2 * 1024 * 1024) / poolBufSize;
int maxBuffCount =
conf.getInt(MAX_BUFFER_COUNT_KEY, conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * bufsForTwoMB * 2);
int minSizeForReservoirUse = conf.getInt(MIN_ALLOCATE_SIZE_KEY, poolBufSize / 6);
return new ByteBuffAllocator(true, maxBuffCount, poolBufSize, minSizeForReservoirUse);
} else {
return new ByteBuffAllocator(false, 0, poolBufSize, Integer.MAX_VALUE);
}
}
/**
* Initialize an {@link ByteBuffAllocator} which only allocate ByteBuffer from on-heap, it's
* designed for testing purpose or disabled reservoir case.
* @return allocator to allocate on-heap ByteBuffer.
*/
public static ByteBuffAllocator createOnHeap() {
return new ByteBuffAllocator(false, 0, DEFAULT_BUFFER_SIZE, Integer.MAX_VALUE);
}
@VisibleForTesting
ByteBuffAllocator(boolean reservoirEnabled, int maxBufCount, int bufSize,
int minSizeForReservoirUse) {
this.reservoirEnabled = reservoirEnabled;
this.maxBufCount = maxBufCount;
this.bufSize = bufSize;
this.minSizeForReservoirUse = minSizeForReservoirUse;
}
public boolean isReservoirEnabled() {
return reservoirEnabled;
}
@VisibleForTesting
public int getQueueSize() {
return this.buffers.size();
}
/**
* Allocate an buffer with buffer size from ByteBuffAllocator, Note to call the
* {@link ByteBuff#release()} if no need any more, otherwise the memory leak happen in NIO
* ByteBuffer pool.
* @return an ByteBuff with the buffer size.
*/
public SingleByteBuff allocateOneBuffer() {
if (isReservoirEnabled()) {
ByteBuffer bb = getBuffer();
if (bb != null) {
return new SingleByteBuff(() -> putbackBuffer(bb), bb);
}
}
// Allocated from heap, let the JVM free its memory.
return new SingleByteBuff(NONE, ByteBuffer.allocate(this.bufSize));
}
/**
* Allocate size bytes from the ByteBufAllocator, Note to call the {@link ByteBuff#release()} if
* no need any more, otherwise the memory leak happen in NIO ByteBuffer pool.
* @param size to allocate
* @return an ByteBuff with the desired size.
*/
public ByteBuff allocate(int size) {
if (size < 0) {
throw new IllegalArgumentException("size to allocate should >=0");
}
// If disabled the reservoir, just allocate it from on-heap.
if (!isReservoirEnabled() || size == 0) {
return new SingleByteBuff(NONE, ByteBuffer.allocate(size));
}
int reminder = size % bufSize;
int len = size / bufSize + (reminder > 0 ? 1 : 0);
List<ByteBuffer> bbs = new ArrayList<>(len);
// Allocate from ByteBufferPool until the remaining is less than minSizeForReservoirUse or
// reservoir is exhausted.
int remain = size;
while (remain >= minSizeForReservoirUse) {
ByteBuffer bb = this.getBuffer();
if (bb == null) {
break;
}
bbs.add(bb);
remain -= bufSize;
}
int lenFromReservoir = bbs.size();
if (remain > 0) {
// If the last ByteBuffer is too small or the reservoir can not provide more ByteBuffers, we
// just allocate the ByteBuffer from on-heap.
bbs.add(ByteBuffer.allocate(remain));
}
ByteBuff bb = wrap(bbs, () -> {
for (int i = 0; i < lenFromReservoir; i++) {
this.putbackBuffer(bbs.get(i));
}
});
bb.limit(size);
return bb;
}
public static ByteBuff wrap(ByteBuffer[] buffers, Recycler recycler) {
if (buffers == null || buffers.length == 0) {
throw new IllegalArgumentException("buffers shouldn't be null or empty");
}
return buffers.length == 1 ? new SingleByteBuff(recycler, buffers[0])
: new MultiByteBuff(recycler, buffers);
}
public static ByteBuff wrap(ByteBuffer[] buffers) {
return wrap(buffers, NONE);
}
public static ByteBuff wrap(List<ByteBuffer> buffers, Recycler recycler) {
if (buffers == null || buffers.size() == 0) {
throw new IllegalArgumentException("buffers shouldn't be null or empty");
}
return buffers.size() == 1 ? new SingleByteBuff(recycler, buffers.get(0))
: new MultiByteBuff(recycler, buffers.toArray(new ByteBuffer[0]));
}
public static ByteBuff wrap(List<ByteBuffer> buffers) {
return wrap(buffers, NONE);
}
/**
* @return One free DirectByteBuffer from the pool. If no free ByteBuffer and we have not reached
* the maximum pool size, it will create a new one and return. In case of max pool size
* also reached, will return null. When pool returned a ByteBuffer, make sure to return it
* back to pool after use.
*/
private ByteBuffer getBuffer() {
ByteBuffer bb = buffers.poll();
if (bb != null) {
// To reset the limit to capacity and position to 0, must clear here.
bb.clear();
return bb;
}
while (true) {
int c = this.usedBufCount.intValue();
if (c >= this.maxBufCount) {
if (!maxPoolSizeInfoLevelLogged) {
LOG.info("Pool already reached its max capacity : {} and no free buffers now. Consider "
+ "increasing the value for '{}' ?",
maxBufCount, MAX_BUFFER_COUNT_KEY);
maxPoolSizeInfoLevelLogged = true;
}
return null;
}
if (!this.usedBufCount.compareAndSet(c, c + 1)) {
continue;
}
return ByteBuffer.allocateDirect(bufSize);
}
}
/**
* Return back a ByteBuffer after its use. Don't read/write the ByteBuffer after the returning.
* @param buf ByteBuffer to return.
*/
private void putbackBuffer(ByteBuffer buf) {
if (buf.capacity() != bufSize || (reservoirEnabled ^ buf.isDirect())) {
LOG.warn("Trying to put a buffer, not created by this pool! Will be just ignored");
return;
}
buffers.offer(buf);
}
}

View File

@ -23,6 +23,8 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@ -39,18 +41,17 @@ import org.slf4j.LoggerFactory;
public class ByteBufferListOutputStream extends ByteBufferOutputStream {
private static final Logger LOG = LoggerFactory.getLogger(ByteBufferListOutputStream.class);
private ByteBufferPool pool;
private ByteBuffAllocator allocator;
// Keep track of the BBs where bytes written to. We will first try to get a BB from the pool. If
// it is not available will make a new one our own and keep writing to that. We keep track of all
// the BBs that we got from pool, separately so that on closeAndPutbackBuffers, we can make sure
// to return back all of them to pool
protected List<ByteBuffer> allBufs = new ArrayList<>();
protected List<ByteBuffer> bufsFromPool = new ArrayList<>();
protected List<SingleByteBuff> allBufs = new ArrayList<>();
private boolean lastBufFlipped = false;// Indicate whether the curBuf/lastBuf is flipped already
public ByteBufferListOutputStream(ByteBufferPool pool) {
this.pool = pool;
public ByteBufferListOutputStream(ByteBuffAllocator allocator) {
this.allocator = allocator;
allocateNewBuffer();
}
@ -58,18 +59,10 @@ public class ByteBufferListOutputStream extends ByteBufferOutputStream {
if (this.curBuf != null) {
this.curBuf.flip();// On the current buf set limit = pos and pos = 0.
}
// Get an initial BB to work with from the pool
this.curBuf = this.pool.getBuffer();
if (this.curBuf == null) {
// No free BB at this moment. Make a new one. The pool returns off heap BBs. Don't make off
// heap BB on demand. It is difficult to account for all such and so proper sizing of Max
// direct heap size. See HBASE-15525 also for more details.
// Make BB with same size of pool's buffer size.
this.curBuf = ByteBuffer.allocate(this.pool.getBufferSize());
} else {
this.bufsFromPool.add(this.curBuf);
}
this.allBufs.add(this.curBuf);
// Get an initial ByteBuffer from the allocator.
SingleByteBuff sbb = allocator.allocateOneBuffer();
this.curBuf = sbb.nioByteBuffers()[0];
this.allBufs.add(sbb);
}
@Override
@ -118,11 +111,8 @@ public class ByteBufferListOutputStream extends ByteBufferOutputStream {
LOG.debug(e.toString(), e);
}
// Return back all the BBs to pool
if (this.bufsFromPool != null) {
for (int i = 0; i < this.bufsFromPool.size(); i++) {
this.pool.putbackBuffer(this.bufsFromPool.get(i));
}
this.bufsFromPool = null;
for (ByteBuff buf : this.allBufs) {
buf.release();
}
this.allBufs = null;
this.curBuf = null;
@ -144,7 +134,11 @@ public class ByteBufferListOutputStream extends ByteBufferOutputStream {
// All the other BBs are already flipped while moving to the new BB.
curBuf.flip();
}
return this.allBufs;
List<ByteBuffer> bbs = new ArrayList<>(this.allBufs.size());
for (SingleByteBuff bb : this.allBufs) {
bbs.add(bb.nioByteBuffers()[0]);
}
return bbs;
}
@Override

View File

@ -1,155 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io;
import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
/**
* Like Hadoops' ByteBufferPool only you do not specify desired size when getting a ByteBuffer. This
* pool keeps an upper bound on the count of ByteBuffers in the pool and a fixed size of ByteBuffer
* that it will create. When requested, if a free ByteBuffer is already present, it will return
* that. And when no free ByteBuffer available and we are below the max count, it will create a new
* one and return that.
*
* <p>
* Note: This pool returns off heap ByteBuffers by default. If on heap ByteBuffers to be pooled,
* pass 'directByteBuffer' as false while construction of the pool.
* <p>
* This class is thread safe.
*
* @see ByteBufferListOutputStream
*/
@InterfaceAudience.Private
public class ByteBufferPool {
private static final Logger LOG = LoggerFactory.getLogger(ByteBufferPool.class);
// TODO better config names?
// hbase.ipc.server.reservoir.initial.max -> hbase.ipc.server.reservoir.max.buffer.count
// hbase.ipc.server.reservoir.initial.buffer.size -> hbase.ipc.server.reservoir.buffer.size
public static final String MAX_POOL_SIZE_KEY = "hbase.ipc.server.reservoir.initial.max";
public static final String BUFFER_SIZE_KEY = "hbase.ipc.server.reservoir.initial.buffer.size";
public static final int DEFAULT_BUFFER_SIZE = 64 * 1024;// 64 KB. Making it same as the chunk size
// what we will write/read to/from the
// socket channel.
private final Queue<ByteBuffer> buffers = new ConcurrentLinkedQueue<>();
private final int bufferSize;
private final int maxPoolSize;
private AtomicInteger count; // Count of the BBs created already for this pool.
private final boolean directByteBuffer; //Whether this pool should return DirectByteBuffers
private boolean maxPoolSizeInfoLevelLogged = false;
/**
* @param bufferSize Size of each buffer created by this pool.
* @param maxPoolSize Max number of buffers to keep in this pool.
*/
public ByteBufferPool(int bufferSize, int maxPoolSize) {
this(bufferSize, maxPoolSize, true);
}
/**
* @param bufferSize Size of each buffer created by this pool.
* @param maxPoolSize Max number of buffers to keep in this pool.
* @param directByteBuffer Whether to create direct ByteBuffer or on heap ByteBuffer.
*/
public ByteBufferPool(int bufferSize, int maxPoolSize, boolean directByteBuffer) {
this.bufferSize = bufferSize;
this.maxPoolSize = maxPoolSize;
this.directByteBuffer = directByteBuffer;
// TODO can add initialPoolSize config also and make those many BBs ready for use.
LOG.info("Created with bufferSize={} and maxPoolSize={}",
org.apache.hadoop.util.StringUtils.byteDesc(bufferSize),
org.apache.hadoop.util.StringUtils.byteDesc(maxPoolSize));
this.count = new AtomicInteger(0);
}
/**
* @return One free ByteBuffer from the pool. If no free ByteBuffer and we have not reached the
* maximum pool size, it will create a new one and return. In case of max pool size also
* reached, will return null. When pool returned a ByteBuffer, make sure to return it back
* to pool after use.
* @see #putbackBuffer(ByteBuffer)
*/
public ByteBuffer getBuffer() {
ByteBuffer bb = buffers.poll();
if (bb != null) {
// Clear sets limit == capacity. Position == 0.
bb.clear();
return bb;
}
while (true) {
int c = this.count.intValue();
if (c >= this.maxPoolSize) {
if (maxPoolSizeInfoLevelLogged) {
if (LOG.isDebugEnabled()) {
LOG.debug("Pool already reached its max capacity : " + this.maxPoolSize
+ " and no free buffers now. Consider increasing the value for '"
+ MAX_POOL_SIZE_KEY + "' ?");
}
} else {
LOG.info("Pool already reached its max capacity : " + this.maxPoolSize
+ " and no free buffers now. Consider increasing the value for '" + MAX_POOL_SIZE_KEY
+ "' ?");
maxPoolSizeInfoLevelLogged = true;
}
return null;
}
if (!this.count.compareAndSet(c, c + 1)) {
continue;
}
if (LOG.isTraceEnabled()) {
LOG.trace("Creating a new offheap ByteBuffer of size: " + this.bufferSize);
}
return this.directByteBuffer ? ByteBuffer.allocateDirect(this.bufferSize)
: ByteBuffer.allocate(this.bufferSize);
}
}
/**
* Return back a ByteBuffer after its use. Do not try to return put back a ByteBuffer, not
* obtained from this pool.
* @param buf ByteBuffer to return.
*/
public void putbackBuffer(ByteBuffer buf) {
if (buf.capacity() != this.bufferSize || (this.directByteBuffer ^ buf.isDirect())) {
LOG.warn("Trying to put a buffer, not created by this pool! Will be just ignored");
return;
}
buffers.offer(buf);
}
public int getBufferSize() {
return this.bufferSize;
}
/**
* @return Number of free buffers
*/
@VisibleForTesting
public int getQueueSize() {
return buffers.size();
}
}

View File

@ -98,7 +98,7 @@ public class CopyKeyDataBlockEncoder extends BufferedDataBlockEncoder {
currentBuffer.skip(current.tagsLength);
}
if (includesMvcc()) {
current.memstoreTS = ByteBuff.readVLong(currentBuffer);
current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
} else {
current.memstoreTS = 0;
}

View File

@ -477,7 +477,7 @@ public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder {
decodeTags();
}
if (includesMvcc()) {
current.memstoreTS = ByteBuff.readVLong(currentBuffer);
current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
} else {
current.memstoreTS = 0;
}

View File

@ -501,7 +501,7 @@ public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder {
decodeTags();
}
if (includesMvcc()) {
current.memstoreTS = ByteBuff.readVLong(currentBuffer);
current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
} else {
current.memstoreTS = 0;
}

View File

@ -213,7 +213,7 @@ public class PrefixKeyDeltaEncoder extends BufferedDataBlockEncoder {
decodeTags();
}
if (includesMvcc()) {
current.memstoreTS = ByteBuff.readVLong(currentBuffer);
current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
} else {
current.memstoreTS = 0;
}

View File

@ -282,7 +282,7 @@ public class RowIndexSeekerV1 extends AbstractEncodedSeeker {
decodeTags();
}
if (includesMvcc()) {
current.memstoreTS = ByteBuff.readVLong(currentBuffer);
current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
} else {
current.memstoreTS = 0;
}

View File

@ -24,22 +24,81 @@ import java.nio.channels.ReadableByteChannel;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ObjectIntPair;
import org.apache.hadoop.io.WritableUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.io.netty.util.ReferenceCounted;
import org.apache.hbase.thirdparty.io.netty.util.internal.ObjectUtil;
/**
* An abstract class that abstracts out as to how the byte buffers are used,
* either single or multiple. We have this interface because the java's ByteBuffers
* cannot be sub-classed. This class provides APIs similar to the ones provided
* in java's nio ByteBuffers and allows you to do positional reads/writes and relative
* reads and writes on the underlying BB. In addition to it, we have some additional APIs which
* helps us in the read path.
* An abstract class that abstracts out as to how the byte buffers are used, either single or
* multiple. We have this interface because the java's ByteBuffers cannot be sub-classed. This class
* provides APIs similar to the ones provided in java's nio ByteBuffers and allows you to do
* positional reads/writes and relative reads and writes on the underlying BB. In addition to it, we
* have some additional APIs which helps us in the read path. <br/>
* The ByteBuff implement {@link ReferenceCounted} interface which mean need to maintains a
* {@link RefCnt} inside, if ensure that the ByteBuff won't be used any more, we must do a
* {@link ByteBuff#release()} to recycle its NIO ByteBuffers. when considering the
* {@link ByteBuff#duplicate()} or {@link ByteBuff#slice()}, releasing either the duplicated one or
* the original one will free its memory, because they share the same NIO ByteBuffers. when you want
* to retain the NIO ByteBuffers even if the origin one called {@link ByteBuff#release()}, you can
* do like this:
*
* <pre>
* ByteBuff original = ...;
* ByteBuff dup = original.duplicate();
* dup.retain();
* original.release();
* // The NIO buffers can still be accessed unless you release the duplicated one
* dup.get(...);
* dup.release();
* // Both the original and dup can not access the NIO buffers any more.
* </pre>
*/
@InterfaceAudience.Private
// TODO to have another name. This can easily get confused with netty's ByteBuf
public abstract class ByteBuff {
public abstract class ByteBuff implements ReferenceCounted {
private static final String REFERENCE_COUNT_NAME = "ReferenceCount";
private static final int NIO_BUFFER_LIMIT = 64 * 1024; // should not be more than 64KB.
protected RefCnt refCnt;
/*************************** Methods for reference count **********************************/
protected void checkRefCount() {
ObjectUtil.checkPositive(refCnt(), REFERENCE_COUNT_NAME);
}
public int refCnt() {
return refCnt.refCnt();
}
@Override
public boolean release() {
return refCnt.release();
}
@Override
public final ByteBuff retain(int increment) {
throw new UnsupportedOperationException();
}
@Override
public final boolean release(int increment) {
throw new UnsupportedOperationException();
}
@Override
public final ByteBuff touch() {
throw new UnsupportedOperationException();
}
@Override
public final ByteBuff touch(Object hint) {
throw new UnsupportedOperationException();
}
/******************************* Methods for ByteBuff **************************************/
/**
* @return this ByteBuff's current position
*/
@ -491,78 +550,11 @@ public abstract class ByteBuff {
return tmpLength;
}
/**
* Similar to {@link WritableUtils#readVLong(java.io.DataInput)} but reads from a
* {@link ByteBuff}.
*/
public static long readVLong(ByteBuff in) {
byte firstByte = in.get();
int len = WritableUtils.decodeVIntSize(firstByte);
if (len == 1) {
return firstByte;
}
long i = 0;
for (int idx = 0; idx < len-1; idx++) {
byte b = in.get();
i = i << 8;
i = i | (b & 0xFF);
}
return (WritableUtils.isNegativeVInt(firstByte) ? (i ^ -1L) : i);
}
/**
* Search sorted array "a" for byte "key".
*
* @param a Array to search. Entries must be sorted and unique.
* @param fromIndex First index inclusive of "a" to include in the search.
* @param toIndex Last index exclusive of "a" to include in the search.
* @param key The byte to search for.
* @return The index of key if found. If not found, return -(index + 1), where
* negative indicates "not found" and the "index + 1" handles the "-0"
* case.
*/
public static int unsignedBinarySearch(ByteBuff a, int fromIndex, int toIndex, byte key) {
int unsignedKey = key & 0xff;
int low = fromIndex;
int high = toIndex - 1;
while (low <= high) {
int mid = low + ((high - low) >> 1);
int midVal = a.get(mid) & 0xff;
if (midVal < unsignedKey) {
low = mid + 1;
} else if (midVal > unsignedKey) {
high = mid - 1;
} else {
return mid; // key found
}
}
return -(low + 1); // key not found.
}
public abstract ByteBuffer[] nioByteBuffers();
@Override
public String toString() {
return this.getClass().getSimpleName() + "[pos=" + position() + ", lim=" + limit() +
", cap= " + capacity() + "]";
}
public static String toStringBinary(final ByteBuff b, int off, int len) {
StringBuilder result = new StringBuilder();
// Just in case we are passed a 'len' that is > buffer length...
if (off >= b.capacity())
return result.toString();
if (off + len > b.capacity())
len = b.capacity() - off;
for (int i = off; i < off + len; ++i) {
int ch = b.get(i) & 0xFF;
if ((ch >= '0' && ch <= '9') || (ch >= 'A' && ch <= 'Z') || (ch >= 'a' && ch <= 'z')
|| " `~!@#$%^&*()-_=+[]{}|;:'\",.<>/?".indexOf(ch) >= 0) {
result.append((char) ch);
} else {
result.append(String.format("\\x%02X", ch));
}
}
return result.toString();
}
}

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.nio;
import static org.apache.hadoop.hbase.io.ByteBuffAllocator.NONE;
import java.io.IOException;
import java.nio.BufferOverflowException;
import java.nio.BufferUnderflowException;
@ -24,13 +26,12 @@ import java.nio.ByteBuffer;
import java.nio.InvalidMarkException;
import java.nio.channels.ReadableByteChannel;
import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ObjectIntPair;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
/**
* Provides a unified view of all the underlying ByteBuffers and will look as if a bigger
* sequential buffer. This class provides similar APIs as in {@link ByteBuffer} to put/get int,
@ -53,6 +54,15 @@ public class MultiByteBuff extends ByteBuff {
private final int[] itemBeginPos;
public MultiByteBuff(ByteBuffer... items) {
this(NONE, items);
}
public MultiByteBuff(Recycler recycler, ByteBuffer... items) {
this(new RefCnt(recycler), items);
}
private MultiByteBuff(RefCnt refCnt, ByteBuffer... items) {
this.refCnt = refCnt;
assert items != null;
assert items.length > 0;
this.items = items;
@ -75,8 +85,9 @@ public class MultiByteBuff extends ByteBuff {
this.limitedItemIndex = this.items.length - 1;
}
private MultiByteBuff(ByteBuffer[] items, int[] itemBeginPos, int limit, int limitedIndex,
int curItemIndex, int markedIndex) {
private MultiByteBuff(RefCnt refCnt, ByteBuffer[] items, int[] itemBeginPos, int limit,
int limitedIndex, int curItemIndex, int markedIndex) {
this.refCnt = refCnt;
this.items = items;
this.curItemIndex = curItemIndex;
this.curItem = this.items[this.curItemIndex];
@ -117,6 +128,7 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public int capacity() {
checkRefCount();
int c = 0;
for (ByteBuffer item : this.items) {
c += item.capacity();
@ -131,12 +143,14 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public byte get(int index) {
checkRefCount();
int itemIndex = getItemIndex(index);
return ByteBufferUtils.toByte(this.items[itemIndex], index - this.itemBeginPos[itemIndex]);
}
@Override
public byte getByteAfterPosition(int offset) {
checkRefCount();
// Mostly the index specified will land within this current item. Short circuit for that
int index = offset + this.position();
int itemIndex = getItemIndexFromCurItemIndex(index);
@ -179,6 +193,7 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public int getInt(int index) {
checkRefCount();
// Mostly the index specified will land within this current item. Short circuit for that
int itemIndex;
if (this.itemBeginPos[this.curItemIndex] <= index
@ -192,6 +207,7 @@ public class MultiByteBuff extends ByteBuff {
@Override
public int getIntAfterPosition(int offset) {
checkRefCount();
// Mostly the index specified will land within this current item. Short circuit for that
int index = offset + this.position();
int itemIndex;
@ -210,6 +226,7 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public short getShort(int index) {
checkRefCount();
// Mostly the index specified will land within this current item. Short circuit for that
int itemIndex;
if (this.itemBeginPos[this.curItemIndex] <= index
@ -238,6 +255,7 @@ public class MultiByteBuff extends ByteBuff {
@Override
public short getShortAfterPosition(int offset) {
checkRefCount();
// Mostly the index specified will land within this current item. Short circuit for that
int index = offset + this.position();
int itemIndex;
@ -319,6 +337,7 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public long getLong(int index) {
checkRefCount();
// Mostly the index specified will land within this current item. Short circuit for that
int itemIndex;
if (this.itemBeginPos[this.curItemIndex] <= index
@ -332,6 +351,7 @@ public class MultiByteBuff extends ByteBuff {
@Override
public long getLongAfterPosition(int offset) {
checkRefCount();
// Mostly the index specified will land within this current item. Short circuit for that
int index = offset + this.position();
int itemIndex;
@ -348,6 +368,7 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public int position() {
checkRefCount();
return itemBeginPos[this.curItemIndex] + this.curItem.position();
}
@ -358,6 +379,7 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public MultiByteBuff position(int position) {
checkRefCount();
// Short circuit for positioning within the cur item. Mostly that is the case.
if (this.itemBeginPos[this.curItemIndex] <= position
&& this.itemBeginPos[this.curItemIndex + 1] > position) {
@ -385,6 +407,7 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public MultiByteBuff rewind() {
checkRefCount();
for (int i = 0; i < this.items.length; i++) {
this.items[i].rewind();
}
@ -400,6 +423,7 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public MultiByteBuff mark() {
checkRefCount();
this.markedItemIndex = this.curItemIndex;
this.curItem.mark();
return this;
@ -412,6 +436,7 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public MultiByteBuff reset() {
checkRefCount();
// when the buffer is moved to the next one.. the reset should happen on the previous marked
// item and the new one should be taken as the base
if (this.markedItemIndex < 0) throw new InvalidMarkException();
@ -433,6 +458,7 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public int remaining() {
checkRefCount();
int remain = 0;
for (int i = curItemIndex; i < items.length; i++) {
remain += items[i].remaining();
@ -446,6 +472,7 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public final boolean hasRemaining() {
checkRefCount();
return this.curItem.hasRemaining() || (this.curItemIndex < this.limitedItemIndex
&& this.items[this.curItemIndex + 1].hasRemaining());
}
@ -457,6 +484,7 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public byte get() {
checkRefCount();
if (this.curItem.remaining() == 0) {
if (items.length - 1 == this.curItemIndex) {
// means cur item is the last one and we wont be able to read a long. Throw exception
@ -476,6 +504,7 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public short getShort() {
checkRefCount();
int remaining = this.curItem.remaining();
if (remaining >= Bytes.SIZEOF_SHORT) {
return this.curItem.getShort();
@ -494,6 +523,7 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public int getInt() {
checkRefCount();
int remaining = this.curItem.remaining();
if (remaining >= Bytes.SIZEOF_INT) {
return this.curItem.getInt();
@ -514,6 +544,7 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public long getLong() {
checkRefCount();
int remaining = this.curItem.remaining();
if (remaining >= Bytes.SIZEOF_LONG) {
return this.curItem.getLong();
@ -545,6 +576,7 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public void get(byte[] dst, int offset, int length) {
checkRefCount();
while (length > 0) {
int toRead = Math.min(length, this.curItem.remaining());
ByteBufferUtils.copyFromBufferToArray(dst, this.curItem, this.curItem.position(), offset,
@ -560,6 +592,7 @@ public class MultiByteBuff extends ByteBuff {
@Override
public void get(int sourceOffset, byte[] dst, int offset, int length) {
checkRefCount();
int itemIndex = getItemIndex(sourceOffset);
ByteBuffer item = this.items[itemIndex];
sourceOffset = sourceOffset - this.itemBeginPos[itemIndex];
@ -583,6 +616,7 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public MultiByteBuff limit(int limit) {
checkRefCount();
this.limit = limit;
// Normally the limit will try to limit within the last BB item
int limitedIndexBegin = this.itemBeginPos[this.limitedItemIndex];
@ -622,29 +656,30 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public MultiByteBuff slice() {
checkRefCount();
ByteBuffer[] copy = new ByteBuffer[this.limitedItemIndex - this.curItemIndex + 1];
for (int i = curItemIndex, j = 0; i <= this.limitedItemIndex; i++, j++) {
copy[j] = this.items[i].slice();
}
return new MultiByteBuff(copy);
return new MultiByteBuff(refCnt, copy);
}
/**
* Returns an MBB which is a duplicate version of this MBB. The position, limit and mark
* of the new MBB will be independent than that of the original MBB.
* The content of the new MBB will start at this MBB's current position
* The position, limit and mark of the new MBB would be identical to this MBB in terms of
* values.
* @return a sliced MBB
* Returns an MBB which is a duplicate version of this MBB. The position, limit and mark of the
* new MBB will be independent than that of the original MBB. The content of the new MBB will
* start at this MBB's current position The position, limit and mark of the new MBB would be
* identical to this MBB in terms of values.
* @return a duplicated MBB
*/
@Override
public MultiByteBuff duplicate() {
checkRefCount();
ByteBuffer[] itemsCopy = new ByteBuffer[this.items.length];
for (int i = 0; i < this.items.length; i++) {
itemsCopy[i] = items[i].duplicate();
}
return new MultiByteBuff(itemsCopy, this.itemBeginPos, this.limit, this.limitedItemIndex,
this.curItemIndex, this.markedItemIndex);
return new MultiByteBuff(refCnt, itemsCopy, this.itemBeginPos, this.limit,
this.limitedItemIndex, this.curItemIndex, this.markedItemIndex);
}
/**
@ -654,6 +689,7 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public MultiByteBuff put(byte b) {
checkRefCount();
if (this.curItem.remaining() == 0) {
if (this.curItemIndex == this.items.length - 1) {
throw new BufferOverflowException();
@ -673,6 +709,7 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public MultiByteBuff put(int index, byte b) {
checkRefCount();
int itemIndex = getItemIndex(limit);
ByteBuffer item = items[itemIndex];
item.put(index - itemBeginPos[itemIndex], b);
@ -688,6 +725,7 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public MultiByteBuff put(int offset, ByteBuff src, int srcOffset, int length) {
checkRefCount();
int destItemIndex = getItemIndex(offset);
int srcItemIndex = getItemIndex(srcOffset);
ByteBuffer destItem = this.items[destItemIndex];
@ -723,7 +761,7 @@ public class MultiByteBuff extends ByteBuff {
}
private static ByteBuffer getItemByteBuffer(ByteBuff buf, int index) {
return (buf instanceof SingleByteBuff) ? ((SingleByteBuff) buf).getEnclosingByteBuffer()
return (buf instanceof SingleByteBuff) ? buf.nioByteBuffers()[0]
: ((MultiByteBuff) buf).items[index];
}
@ -734,6 +772,7 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public MultiByteBuff putInt(int val) {
checkRefCount();
if (this.curItem.remaining() >= Bytes.SIZEOF_INT) {
this.curItem.putInt(val);
return this;
@ -784,6 +823,7 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public MultiByteBuff put(byte[] src, int offset, int length) {
checkRefCount();
if (this.curItem.remaining() >= length) {
ByteBufferUtils.copyFromArrayToBuffer(this.curItem, src, offset, length);
return this;
@ -803,6 +843,7 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public MultiByteBuff putLong(long val) {
checkRefCount();
if (this.curItem.remaining() >= Bytes.SIZEOF_LONG) {
this.curItem.putLong(val);
return this;
@ -860,6 +901,7 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public MultiByteBuff skip(int length) {
checkRefCount();
// Get available bytes from this item and remaining from next
int jump = 0;
while (true) {
@ -882,6 +924,7 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public MultiByteBuff moveBack(int length) {
checkRefCount();
while (length != 0) {
if (length > curItem.position()) {
length -= curItem.position();
@ -909,6 +952,7 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public ByteBuffer asSubByteBuffer(int length) {
checkRefCount();
if (this.curItem.remaining() >= length) {
return this.curItem;
}
@ -918,8 +962,8 @@ public class MultiByteBuff extends ByteBuff {
ByteBuffer locCurItem = curItem;
while (length > 0) {
int toRead = Math.min(length, locCurItem.remaining());
ByteBufferUtils
.copyFromBufferToArray(dupB, locCurItem, locCurItem.position(), offset, toRead);
ByteBufferUtils.copyFromBufferToArray(dupB, locCurItem, locCurItem.position(), offset,
toRead);
length -= toRead;
if (length == 0) break;
locCurItemIndex++;
@ -945,6 +989,7 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public void asSubByteBuffer(int offset, int length, ObjectIntPair<ByteBuffer> pair) {
checkRefCount();
if (this.itemBeginPos[this.curItemIndex] <= offset) {
int relOffsetInCurItem = offset - this.itemBeginPos[this.curItemIndex];
if (this.curItem.limit() - relOffsetInCurItem >= length) {
@ -988,6 +1033,7 @@ public class MultiByteBuff extends ByteBuff {
@Override
public void get(ByteBuffer out, int sourceOffset,
int length) {
checkRefCount();
// Not used from real read path actually. So not going with
// optimization
for (int i = 0; i < length; ++i) {
@ -1007,6 +1053,7 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public byte[] toBytes(int offset, int length) {
checkRefCount();
byte[] output = new byte[length];
this.get(offset, output, 0, length);
return output;
@ -1014,6 +1061,7 @@ public class MultiByteBuff extends ByteBuff {
@Override
public int read(ReadableByteChannel channel) throws IOException {
checkRefCount();
int total = 0;
while (true) {
// Read max possible into the current BB
@ -1033,6 +1081,12 @@ public class MultiByteBuff extends ByteBuff {
return total;
}
@Override
public ByteBuffer[] nioByteBuffers() {
checkRefCount();
return this.items;
}
@Override
public boolean equals(Object obj) {
if (!(obj instanceof MultiByteBuff)) return false;
@ -1040,7 +1094,7 @@ public class MultiByteBuff extends ByteBuff {
MultiByteBuff that = (MultiByteBuff) obj;
if (this.capacity() != that.capacity()) return false;
if (ByteBuff.compareTo(this, this.position(), this.limit(), that, that.position(),
that.limit()) == 0) {
that.limit()) == 0) {
return true;
}
return false;
@ -1055,11 +1109,9 @@ public class MultiByteBuff extends ByteBuff {
return hash;
}
/**
* @return the ByteBuffers which this wraps.
*/
@VisibleForTesting
public ByteBuffer[] getEnclosingByteBuffers() {
return this.items;
@Override
public MultiByteBuff retain() {
refCnt.retain();
return this;
}
}

View File

@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.nio;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.io.netty.util.AbstractReferenceCounted;
import org.apache.hbase.thirdparty.io.netty.util.ReferenceCounted;
/**
* Maintain an reference count integer inside to track life cycle of {@link ByteBuff}, if the
* reference count become 0, it'll call {@link Recycler#free()} once.
*/
@InterfaceAudience.Private
class RefCnt extends AbstractReferenceCounted {
private Recycler recycler = ByteBuffAllocator.NONE;
RefCnt(Recycler recycler) {
this.recycler = recycler;
}
@Override
protected final void deallocate() {
this.recycler.free();
}
@Override
public final ReferenceCounted touch(Object hint) {
throw new UnsupportedOperationException();
}
}

View File

@ -17,22 +17,24 @@
*/
package org.apache.hadoop.hbase.nio;
import static org.apache.hadoop.hbase.io.ByteBuffAllocator.NONE;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.ObjectIntPair;
import org.apache.hadoop.hbase.util.UnsafeAccess;
import org.apache.hadoop.hbase.util.UnsafeAvailChecker;
import org.apache.yetus.audience.InterfaceAudience;
import sun.nio.ch.DirectBuffer;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
/**
* An implementation of ByteBuff where a single BB backs the BBI. This just acts
* as a wrapper over a normal BB - offheap or onheap
* An implementation of ByteBuff where a single BB backs the BBI. This just acts as a wrapper over a
* normal BB - offheap or onheap
*/
@InterfaceAudience.Private
public class SingleByteBuff extends ByteBuff {
@ -48,6 +50,15 @@ public class SingleByteBuff extends ByteBuff {
private Object unsafeRef = null;
public SingleByteBuff(ByteBuffer buf) {
this(NONE, buf);
}
public SingleByteBuff(Recycler recycler, ByteBuffer buf) {
this(new RefCnt(recycler), buf);
}
private SingleByteBuff(RefCnt refCnt, ByteBuffer buf) {
this.refCnt = refCnt;
this.buf = buf;
if (buf.hasArray()) {
this.unsafeOffset = UnsafeAccess.BYTE_ARRAY_BASE_OFFSET + buf.arrayOffset();
@ -59,63 +70,74 @@ public class SingleByteBuff extends ByteBuff {
@Override
public int position() {
checkRefCount();
return this.buf.position();
}
@Override
public SingleByteBuff position(int position) {
checkRefCount();
this.buf.position(position);
return this;
}
@Override
public SingleByteBuff skip(int len) {
checkRefCount();
this.buf.position(this.buf.position() + len);
return this;
}
@Override
public SingleByteBuff moveBack(int len) {
checkRefCount();
this.buf.position(this.buf.position() - len);
return this;
}
@Override
public int capacity() {
checkRefCount();
return this.buf.capacity();
}
@Override
public int limit() {
checkRefCount();
return this.buf.limit();
}
@Override
public SingleByteBuff limit(int limit) {
checkRefCount();
this.buf.limit(limit);
return this;
}
@Override
public SingleByteBuff rewind() {
checkRefCount();
this.buf.rewind();
return this;
}
@Override
public SingleByteBuff mark() {
checkRefCount();
this.buf.mark();
return this;
}
@Override
public ByteBuffer asSubByteBuffer(int length) {
checkRefCount();
// Just return the single BB that is available
return this.buf;
}
@Override
public void asSubByteBuffer(int offset, int length, ObjectIntPair<ByteBuffer> pair) {
checkRefCount();
// Just return the single BB that is available
pair.setFirst(this.buf);
pair.setSecond(offset);
@ -123,37 +145,44 @@ public class SingleByteBuff extends ByteBuff {
@Override
public int remaining() {
checkRefCount();
return this.buf.remaining();
}
@Override
public boolean hasRemaining() {
checkRefCount();
return buf.hasRemaining();
}
@Override
public SingleByteBuff reset() {
checkRefCount();
this.buf.reset();
return this;
}
@Override
public SingleByteBuff slice() {
return new SingleByteBuff(this.buf.slice());
checkRefCount();
return new SingleByteBuff(this.refCnt, this.buf.slice());
}
@Override
public SingleByteBuff duplicate() {
return new SingleByteBuff(this.buf.duplicate());
checkRefCount();
return new SingleByteBuff(this.refCnt, this.buf.duplicate());
}
@Override
public byte get() {
checkRefCount();
return buf.get();
}
@Override
public byte get(int index) {
checkRefCount();
if (UNSAFE_AVAIL) {
return UnsafeAccess.toByte(this.unsafeRef, this.unsafeOffset + index);
}
@ -162,29 +191,34 @@ public class SingleByteBuff extends ByteBuff {
@Override
public byte getByteAfterPosition(int offset) {
checkRefCount();
return get(this.buf.position() + offset);
}
@Override
public SingleByteBuff put(byte b) {
checkRefCount();
this.buf.put(b);
return this;
}
@Override
public SingleByteBuff put(int index, byte b) {
checkRefCount();
buf.put(index, b);
return this;
}
@Override
public void get(byte[] dst, int offset, int length) {
checkRefCount();
ByteBufferUtils.copyFromBufferToArray(dst, buf, buf.position(), offset, length);
buf.position(buf.position() + length);
}
@Override
public void get(int sourceOffset, byte[] dst, int offset, int length) {
checkRefCount();
ByteBufferUtils.copyFromBufferToArray(dst, buf, sourceOffset, offset, length);
}
@ -195,9 +229,10 @@ public class SingleByteBuff extends ByteBuff {
@Override
public SingleByteBuff put(int offset, ByteBuff src, int srcOffset, int length) {
checkRefCount();
if (src instanceof SingleByteBuff) {
ByteBufferUtils.copyFromBufferToBuffer(((SingleByteBuff) src).buf, this.buf, srcOffset,
offset, length);
offset, length);
} else {
// TODO we can do some optimization here? Call to asSubByteBuffer might
// create a copy.
@ -205,7 +240,7 @@ public class SingleByteBuff extends ByteBuff {
src.asSubByteBuffer(srcOffset, length, pair);
if (pair.getFirst() != null) {
ByteBufferUtils.copyFromBufferToBuffer(pair.getFirst(), this.buf, pair.getSecond(), offset,
length);
length);
}
}
return this;
@ -213,37 +248,44 @@ public class SingleByteBuff extends ByteBuff {
@Override
public SingleByteBuff put(byte[] src, int offset, int length) {
checkRefCount();
ByteBufferUtils.copyFromArrayToBuffer(this.buf, src, offset, length);
return this;
}
@Override
public SingleByteBuff put(byte[] src) {
checkRefCount();
return put(src, 0, src.length);
}
@Override
public boolean hasArray() {
checkRefCount();
return this.buf.hasArray();
}
@Override
public byte[] array() {
checkRefCount();
return this.buf.array();
}
@Override
public int arrayOffset() {
checkRefCount();
return this.buf.arrayOffset();
}
@Override
public short getShort() {
checkRefCount();
return this.buf.getShort();
}
@Override
public short getShort(int index) {
checkRefCount();
if (UNSAFE_UNALIGNED) {
return UnsafeAccess.toShort(unsafeRef, unsafeOffset + index);
}
@ -252,22 +294,26 @@ public class SingleByteBuff extends ByteBuff {
@Override
public short getShortAfterPosition(int offset) {
checkRefCount();
return getShort(this.buf.position() + offset);
}
@Override
public int getInt() {
checkRefCount();
return this.buf.getInt();
}
@Override
public SingleByteBuff putInt(int value) {
checkRefCount();
ByteBufferUtils.putInt(this.buf, value);
return this;
}
@Override
public int getInt(int index) {
checkRefCount();
if (UNSAFE_UNALIGNED) {
return UnsafeAccess.toInt(unsafeRef, unsafeOffset + index);
}
@ -276,22 +322,26 @@ public class SingleByteBuff extends ByteBuff {
@Override
public int getIntAfterPosition(int offset) {
checkRefCount();
return getInt(this.buf.position() + offset);
}
@Override
public long getLong() {
checkRefCount();
return this.buf.getLong();
}
@Override
public SingleByteBuff putLong(long value) {
checkRefCount();
ByteBufferUtils.putLong(this.buf, value);
return this;
}
@Override
public long getLong(int index) {
checkRefCount();
if (UNSAFE_UNALIGNED) {
return UnsafeAccess.toLong(unsafeRef, unsafeOffset + index);
}
@ -300,11 +350,13 @@ public class SingleByteBuff extends ByteBuff {
@Override
public long getLongAfterPosition(int offset) {
checkRefCount();
return getLong(this.buf.position() + offset);
}
@Override
public byte[] toBytes(int offset, int length) {
checkRefCount();
byte[] output = new byte[length];
ByteBufferUtils.copyFromBufferToArray(output, buf, offset, 0, length);
return output;
@ -312,18 +364,28 @@ public class SingleByteBuff extends ByteBuff {
@Override
public void get(ByteBuffer out, int sourceOffset, int length) {
checkRefCount();
ByteBufferUtils.copyFromBufferToBuffer(buf, out, sourceOffset, length);
}
@Override
public int read(ReadableByteChannel channel) throws IOException {
checkRefCount();
return channelRead(channel, buf);
}
@Override
public ByteBuffer[] nioByteBuffers() {
checkRefCount();
return new ByteBuffer[] { this.buf };
}
@Override
public boolean equals(Object obj) {
if(!(obj instanceof SingleByteBuff)) return false;
return this.buf.equals(((SingleByteBuff)obj).buf);
if (!(obj instanceof SingleByteBuff)) {
return false;
}
return this.buf.equals(((SingleByteBuff) obj).buf);
}
@Override
@ -331,11 +393,9 @@ public class SingleByteBuff extends ByteBuff {
return this.buf.hashCode();
}
/**
* @return the ByteBuffer which this wraps.
*/
@VisibleForTesting
public ByteBuffer getEnclosingByteBuffer() {
return this.buf;
@Override
public SingleByteBuff retain() {
refCnt.retain();
return this;
}
}

View File

@ -27,9 +27,9 @@ import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.MultiByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@ -311,10 +311,6 @@ public class ByteBufferArray {
srcIndex += cnt;
}
assert srcIndex == len;
if (mbb.length > 1) {
return new MultiByteBuff(mbb);
} else {
return new SingleByteBuff(mbb[0]);
}
return ByteBuffAllocator.wrap(mbb);
}
}

View File

@ -30,6 +30,7 @@ import java.util.Arrays;
import org.apache.hadoop.hbase.io.ByteBufferWriter;
import org.apache.hadoop.hbase.io.util.StreamUtils;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.WritableUtils;
import org.apache.yetus.audience.InterfaceAudience;
@ -348,25 +349,39 @@ public final class ByteBufferUtils {
}
}
/**
* Similar to {@link WritableUtils#readVLong(DataInput)} but reads from a
* {@link ByteBuffer}.
*/
public static long readVLong(ByteBuffer in) {
byte firstByte = in.get();
private interface ByteVisitor {
byte get();
}
private static long readVLong(ByteVisitor visitor) {
byte firstByte = visitor.get();
int len = WritableUtils.decodeVIntSize(firstByte);
if (len == 1) {
return firstByte;
}
long i = 0;
for (int idx = 0; idx < len-1; idx++) {
byte b = in.get();
for (int idx = 0; idx < len - 1; idx++) {
byte b = visitor.get();
i = i << 8;
i = i | (b & 0xFF);
}
return (WritableUtils.isNegativeVInt(firstByte) ? (i ^ -1L) : i);
}
/**
* Similar to {@link WritableUtils#readVLong(DataInput)} but reads from a {@link ByteBuffer}.
*/
public static long readVLong(ByteBuffer in) {
return readVLong(in::get);
}
/**
* Similar to {@link WritableUtils#readVLong(java.io.DataInput)} but reads from a
* {@link ByteBuff}.
*/
public static long readVLong(ByteBuff in) {
return readVLong(in::get);
}
/**
* Put in buffer integer using 7 bit encoding. For each written byte:

View File

@ -0,0 +1,309 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.MultiByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.testclassification.RPCTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ RPCTests.class, SmallTests.class })
public class TestByteBuffAllocator {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestByteBuffAllocator.class);
@Test
public void testAllocateByteBuffToReadInto() {
int maxBuffersInPool = 10;
int bufSize = 6 * 1024;
ByteBuffAllocator alloc = new ByteBuffAllocator(true, maxBuffersInPool, bufSize, bufSize / 6);
ByteBuff buff = alloc.allocate(10 * bufSize);
buff.release();
// When the request size is less than 1/6th of the pool buffer size. We should use on demand
// created on heap Buffer
buff = alloc.allocate(200);
assertTrue(buff.hasArray());
assertEquals(maxBuffersInPool, alloc.getQueueSize());
buff.release();
// When the request size is > 1/6th of the pool buffer size.
buff = alloc.allocate(1024);
assertFalse(buff.hasArray());
assertEquals(maxBuffersInPool - 1, alloc.getQueueSize());
buff.release();// ByteBuffDeallocaor#free should put back the BB to pool.
assertEquals(maxBuffersInPool, alloc.getQueueSize());
// Request size> pool buffer size
buff = alloc.allocate(7 * 1024);
assertFalse(buff.hasArray());
assertTrue(buff instanceof MultiByteBuff);
ByteBuffer[] bbs = buff.nioByteBuffers();
assertEquals(2, bbs.length);
assertTrue(bbs[0].isDirect());
assertTrue(bbs[1].isDirect());
assertEquals(6 * 1024, bbs[0].limit());
assertEquals(1024, bbs[1].limit());
assertEquals(maxBuffersInPool - 2, alloc.getQueueSize());
buff.release();
assertEquals(maxBuffersInPool, alloc.getQueueSize());
buff = alloc.allocate(6 * 1024 + 200);
assertFalse(buff.hasArray());
assertTrue(buff instanceof MultiByteBuff);
bbs = buff.nioByteBuffers();
assertEquals(2, bbs.length);
assertTrue(bbs[0].isDirect());
assertFalse(bbs[1].isDirect());
assertEquals(6 * 1024, bbs[0].limit());
assertEquals(200, bbs[1].limit());
assertEquals(maxBuffersInPool - 1, alloc.getQueueSize());
buff.release();
assertEquals(maxBuffersInPool, alloc.getQueueSize());
alloc.allocate(bufSize * (maxBuffersInPool - 1));
buff = alloc.allocate(20 * 1024);
assertFalse(buff.hasArray());
assertTrue(buff instanceof MultiByteBuff);
bbs = buff.nioByteBuffers();
assertEquals(2, bbs.length);
assertTrue(bbs[0].isDirect());
assertFalse(bbs[1].isDirect());
assertEquals(6 * 1024, bbs[0].limit());
assertEquals(14 * 1024, bbs[1].limit());
assertEquals(0, alloc.getQueueSize());
buff.release();
assertEquals(1, alloc.getQueueSize());
alloc.allocateOneBuffer();
buff = alloc.allocate(7 * 1024);
assertTrue(buff.hasArray());
assertTrue(buff instanceof SingleByteBuff);
assertEquals(7 * 1024, buff.nioByteBuffers()[0].limit());
buff.release();
}
@Test
public void testNegativeAllocatedSize() {
int maxBuffersInPool = 10;
ByteBuffAllocator allocator =
new ByteBuffAllocator(true, maxBuffersInPool, 6 * 1024, 1024);
try {
allocator.allocate(-1);
fail("Should throw exception when size < 0");
} catch (IllegalArgumentException e) {
// expected exception
}
ByteBuff bb = allocator.allocate(0);
bb.release();
}
@Test
public void testAllocateOneBuffer() {
// Allocate from on-heap
ByteBuffAllocator allocator = ByteBuffAllocator.createOnHeap();
ByteBuff buf = allocator.allocateOneBuffer();
assertTrue(buf.hasArray());
assertEquals(ByteBuffAllocator.DEFAULT_BUFFER_SIZE, buf.remaining());
buf.release();
// Allocate from off-heap
int bufSize = 10;
allocator = new ByteBuffAllocator(true, 1, 10, 3);
buf = allocator.allocateOneBuffer();
assertFalse(buf.hasArray());
assertEquals(buf.remaining(), bufSize);
// The another one will be allocated from on-heap because the pool has only one ByteBuffer,
// and still not be cleaned.
ByteBuff buf2 = allocator.allocateOneBuffer();
assertTrue(buf2.hasArray());
assertEquals(buf2.remaining(), bufSize);
// free the first one
buf.release();
// The next one will be off-heap again.
buf = allocator.allocateOneBuffer();
assertFalse(buf.hasArray());
assertEquals(buf.remaining(), bufSize);
buf.release();
}
@Test
public void testReferenceCount() {
int bufSize = 64;
ByteBuffAllocator alloc = new ByteBuffAllocator(true, 2, bufSize, 3);
ByteBuff buf1 = alloc.allocate(bufSize * 2);
assertFalse(buf1.hasArray());
// The next one will be allocated from heap
ByteBuff buf2 = alloc.allocateOneBuffer();
assertTrue(buf2.hasArray());
// duplicate the buf2, if the dup released, buf2 will also be released (SingleByteBuffer)
ByteBuff dup2 = buf2.duplicate();
dup2.release();
assertEquals(0, buf2.refCnt());
assertEquals(0, dup2.refCnt());
assertEquals(0, alloc.getQueueSize());
assertException(dup2::position);
assertException(buf2::position);
// duplicate the buf1, if the dup1 released, buf1 will also be released (MultipleByteBuffer)
ByteBuff dup1 = buf1.duplicate();
dup1.release();
assertEquals(0, buf1.refCnt());
assertEquals(0, dup1.refCnt());
assertEquals(2, alloc.getQueueSize());
assertException(dup1::position);
assertException(buf1::position);
// slice the buf3, if the slice3 released, buf3 will also be released (SingleByteBuffer)
ByteBuff buf3 = alloc.allocateOneBuffer();
assertFalse(buf3.hasArray());
ByteBuff slice3 = buf3.slice();
slice3.release();
assertEquals(0, buf3.refCnt());
assertEquals(0, slice3.refCnt());
assertEquals(2, alloc.getQueueSize());
// slice the buf4, if the slice4 released, buf4 will also be released (MultipleByteBuffer)
ByteBuff buf4 = alloc.allocate(bufSize * 2);
assertFalse(buf4.hasArray());
ByteBuff slice4 = buf4.slice();
slice4.release();
assertEquals(0, buf4.refCnt());
assertEquals(0, slice4.refCnt());
assertEquals(2, alloc.getQueueSize());
// Test multiple reference for the same ByteBuff (SingleByteBuff)
ByteBuff buf5 = alloc.allocateOneBuffer();
ByteBuff slice5 = buf5.duplicate().duplicate().duplicate().slice().slice();
slice5.release();
assertEquals(0, buf5.refCnt());
assertEquals(0, slice5.refCnt());
assertEquals(2, alloc.getQueueSize());
assertException(slice5::position);
assertException(buf5::position);
// Test multiple reference for the same ByteBuff (SingleByteBuff)
ByteBuff buf6 = alloc.allocate(bufSize >> 2);
ByteBuff slice6 = buf6.duplicate().duplicate().duplicate().slice().slice();
slice6.release();
assertEquals(0, buf6.refCnt());
assertEquals(0, slice6.refCnt());
assertEquals(2, alloc.getQueueSize());
// Test retain the parent SingleByteBuff (duplicate)
ByteBuff parent = alloc.allocateOneBuffer();
ByteBuff child = parent.duplicate();
child.retain();
parent.release();
assertEquals(1, child.refCnt());
assertEquals(1, parent.refCnt());
assertEquals(1, alloc.getQueueSize());
parent.release();
assertEquals(0, child.refCnt());
assertEquals(0, parent.refCnt());
assertEquals(2, alloc.getQueueSize());
// Test retain parent MultiByteBuff (duplicate)
parent = alloc.allocate(bufSize << 1);
child = parent.duplicate();
child.retain();
parent.release();
assertEquals(1, child.refCnt());
assertEquals(1, parent.refCnt());
assertEquals(0, alloc.getQueueSize());
parent.release();
assertEquals(0, child.refCnt());
assertEquals(0, parent.refCnt());
assertEquals(2, alloc.getQueueSize());
// Test retain the parent SingleByteBuff (slice)
parent = alloc.allocateOneBuffer();
child = parent.slice();
child.retain();
parent.release();
assertEquals(1, child.refCnt());
assertEquals(1, parent.refCnt());
assertEquals(1, alloc.getQueueSize());
parent.release();
assertEquals(0, child.refCnt());
assertEquals(0, parent.refCnt());
assertEquals(2, alloc.getQueueSize());
// Test retain parent MultiByteBuff (slice)
parent = alloc.allocate(bufSize << 1);
child = parent.slice();
child.retain();
parent.release();
assertEquals(1, child.refCnt());
assertEquals(1, parent.refCnt());
assertEquals(0, alloc.getQueueSize());
parent.release();
assertEquals(0, child.refCnt());
assertEquals(0, parent.refCnt());
assertEquals(2, alloc.getQueueSize());
}
@Test
public void testReverseRef() {
int bufSize = 64;
ByteBuffAllocator alloc = new ByteBuffAllocator(true, 1, bufSize, 3);
ByteBuff buf1 = alloc.allocate(bufSize);
ByteBuff dup1 = buf1.duplicate();
assertEquals(1, buf1.refCnt());
assertEquals(1, dup1.refCnt());
buf1.release();
assertEquals(0, buf1.refCnt());
assertEquals(0, dup1.refCnt());
assertEquals(1, alloc.getQueueSize());
assertException(buf1::position);
assertException(dup1::position);
}
@Test
public void testByteBuffUnsupportedMethods() {
int bufSize = 64;
ByteBuffAllocator alloc = new ByteBuffAllocator(true, 1, bufSize, 3);
ByteBuff buf = alloc.allocate(bufSize);
assertException(() -> buf.retain(2));
assertException(() -> buf.release(2));
assertException(() -> buf.touch());
assertException(() -> buf.touch(new Object()));
}
private void assertException(Runnable r) {
try {
r.run();
fail();
} catch (Exception e) {
// expected exception.
}
}
}

View File

@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
import java.nio.ByteBuffer;
import java.util.List;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
@ -40,29 +41,30 @@ public class TestByteBufferListOutputStream {
@Test
public void testWrites() throws Exception {
ByteBufferPool pool = new ByteBufferPool(10, 3);
ByteBufferListOutputStream bbos = new ByteBufferListOutputStream(pool);
ByteBuffAllocator alloc = new ByteBuffAllocator(true, 3, 10, 10 / 6);
ByteBufferListOutputStream bbos = new ByteBufferListOutputStream(alloc);
bbos.write(2);// Write a byte
bbos.writeInt(100);// Write an int
byte[] b = Bytes.toBytes("row123");// 6 bytes
bbos.write(b);
assertEquals(2, bbos.allBufs.size());
// Just use the 3rd BB from pool so that pabos, on request, wont get one
ByteBuffer bb1 = pool.getBuffer();
ByteBuff bb1 = alloc.allocateOneBuffer();
ByteBuffer bb = ByteBuffer.wrap(Bytes.toBytes("row123_cf1_q1"));// 13 bytes
bbos.write(bb, 0, bb.capacity());
pool.putbackBuffer(bb1);
bb1.release();
bbos.writeInt(123);
bbos.writeInt(124);
assertEquals(0, pool.getQueueSize());
assertEquals(0, alloc.getQueueSize());
List<ByteBuffer> allBufs = bbos.getByteBuffers();
assertEquals(4, allBufs.size());
assertEquals(3, bbos.bufsFromPool.size());
assertEquals(4, bbos.allBufs.size());
ByteBuffer b1 = allBufs.get(0);
assertEquals(10, b1.remaining());
assertEquals(2, b1.get());
assertEquals(100, b1.getInt());
byte[] bActual = new byte[b.length];
b1.get(bActual, 0, 5);//5 bytes in 1st BB
b1.get(bActual, 0, 5);// 5 bytes in 1st BB
ByteBuffer b2 = allBufs.get(1);
assertEquals(10, b2.remaining());
b2.get(bActual, 5, 1);// Remaining 1 byte in 2nd BB
@ -78,6 +80,6 @@ public class TestByteBufferListOutputStream {
assertEquals(4, b4.remaining());
assertEquals(124, b4.getInt());
bbos.releaseResources();
assertEquals(3, pool.getQueueSize());
assertEquals(3, alloc.getQueueSize());
}
}

View File

@ -1,67 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io;
import static org.junit.Assert.assertEquals;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ IOTests.class, SmallTests.class })
public class TestByteBufferPool {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestByteBufferPool.class);
@Test
public void testOffheapBBPool() throws Exception {
boolean directByteBuffer = true;
testBBPool(10, 100, directByteBuffer);
}
@Test
public void testOnheapBBPool() throws Exception {
boolean directByteBuffer = false;
testBBPool(10, 100, directByteBuffer);
}
private void testBBPool(int maxPoolSize, int bufferSize, boolean directByteBuffer) {
ByteBufferPool pool = new ByteBufferPool(bufferSize, maxPoolSize, directByteBuffer);
for (int i = 0; i < maxPoolSize; i++) {
ByteBuffer buffer = pool.getBuffer();
assertEquals(0, buffer.position());
assertEquals(bufferSize, buffer.limit());
assertEquals(directByteBuffer, buffer.isDirect());
}
assertEquals(0, pool.getQueueSize());
ByteBuffer bb = directByteBuffer ? ByteBuffer.allocate(bufferSize)
: ByteBuffer.allocateDirect(bufferSize);
pool.putbackBuffer(bb);
assertEquals(0, pool.getQueueSize());
bb = directByteBuffer ? ByteBuffer.allocateDirect(bufferSize + 1)
: ByteBuffer.allocate(bufferSize + 1);
pool.putbackBuffer(bb);
assertEquals(0, pool.getQueueSize());
}
}

View File

@ -286,12 +286,12 @@ public class TestMultiByteBuff {
multi.putInt(45);
multi.position(1);
multi.limit(multi.position() + (2 * Bytes.SIZEOF_LONG));
MultiByteBuff sliced = multi.slice();
ByteBuff sliced = multi.slice();
assertEquals(0, sliced.position());
assertEquals((2 * Bytes.SIZEOF_LONG), sliced.limit());
assertEquals(l1, sliced.getLong());
assertEquals(l2, sliced.getLong());
MultiByteBuff dup = multi.duplicate();
ByteBuff dup = multi.duplicate();
assertEquals(1, dup.position());
assertEquals(dup.position() + (2 * Bytes.SIZEOF_LONG), dup.limit());
assertEquals(l1, dup.getLong());

View File

@ -69,11 +69,10 @@ public interface Cacheable extends HeapSize {
/**
* SHARED means when this Cacheable is read back from cache it refers to the same memory area as
* used by the cache for caching it.
* EXCLUSIVE means when this Cacheable is read back from cache, the data was copied to an
* exclusive memory area of this Cacheable.
* used by the cache for caching it. EXCLUSIVE means when this Cacheable is read back from cache,
* the data was copied to an exclusive memory area of this Cacheable.
*/
public static enum MemoryType {
enum MemoryType {
SHARED, EXCLUSIVE
}
}

View File

@ -127,7 +127,7 @@ public class NettyRpcFrameDecoder extends ByteToMessageDecoder {
NettyServerCall reqTooBig =
new NettyServerCall(header.getCallId(), connection.service, null, null, null, null,
connection, 0, connection.addr, System.currentTimeMillis(), 0,
connection.rpcServer.reservoir, connection.rpcServer.cellBlockBuilder, null);
connection.rpcServer.bbAllocator, connection.rpcServer.cellBlockBuilder, null);
connection.rpcServer.metrics.exception(SimpleRpcServer.REQUEST_TOO_BIG_EXCEPTION);

View File

@ -187,7 +187,7 @@ public class NettyRpcServer extends RpcServer {
Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status,
long startTime, int timeout) throws IOException {
NettyServerCall fakeCall = new NettyServerCall(-1, service, md, null, param, cellScanner, null,
-1, null, receiveTime, timeout, reservoir, cellBlockBuilder, null);
-1, null, receiveTime, timeout, bbAllocator, cellBlockBuilder, null);
return call(fakeCall, status);
}
}

View File

@ -21,9 +21,9 @@ import java.io.IOException;
import java.net.InetAddress;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.io.ByteBufferPool;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
@ -39,10 +39,10 @@ class NettyServerCall extends ServerCall<NettyServerRpcConnection> {
NettyServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header,
Message param, CellScanner cellScanner, NettyServerRpcConnection connection, long size,
InetAddress remoteAddress, long receiveTime, int timeout,
ByteBufferPool reservoir, CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup) {
super(id, service, md, header, param, cellScanner, connection, size, remoteAddress,
receiveTime, timeout, reservoir, cellBlockBuilder, reqCleanup);
InetAddress remoteAddress, long receiveTime, int timeout, ByteBuffAllocator bbAllocator,
CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup) {
super(id, service, md, header, param, cellScanner, connection, size, remoteAddress, receiveTime,
timeout, bbAllocator, cellBlockBuilder, reqCleanup);
}
/**

View File

@ -59,12 +59,7 @@ class NettyServerRpcConnection extends ServerRpcConnection {
void process(final ByteBuf buf) throws IOException, InterruptedException {
if (connectionHeaderRead) {
this.callCleanup = new RpcServer.CallCleanup() {
@Override
public void run() {
buf.release();
}
};
this.callCleanup = buf::release;
process(new SingleByteBuff(buf.nioBuffer()));
} else {
ByteBuffer connectionHeader = ByteBuffer.allocate(buf.readableBytes());
@ -121,7 +116,7 @@ class NettyServerRpcConnection extends ServerRpcConnection {
long size, final InetAddress remoteAddress, int timeout,
CallCleanup reqCleanup) {
return new NettyServerCall(id, service, md, header, param, cellScanner, this, size,
remoteAddress, System.currentTimeMillis(), timeout, this.rpcServer.reservoir,
remoteAddress, System.currentTimeMillis(), timeout, this.rpcServer.bbAllocator,
this.rpcServer.cellBlockBuilder, reqCleanup);
}

View File

@ -26,7 +26,6 @@ import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@ -38,16 +37,12 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.exceptions.RequestTooBigException;
import org.apache.hadoop.hbase.io.ByteBufferPool;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.MultiByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.security.SaslUtil;
import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
@ -210,11 +205,7 @@ public abstract class RpcServer implements RpcServerInterface,
protected UserProvider userProvider;
protected final ByteBufferPool reservoir;
// The requests and response will use buffers from ByteBufferPool, when the size of the
// request/response is at least this size.
// We make this to be 1/6th of the pool buffer size.
protected final int minSizeForReservoirUse;
protected final ByteBuffAllocator bbAllocator;
protected volatile boolean allowFallbackToSimpleAuth;
@ -225,7 +216,7 @@ public abstract class RpcServer implements RpcServerInterface,
private RSRpcServices rsRpcServices;
@FunctionalInterface
protected static interface CallCleanup {
protected interface CallCleanup {
void run();
}
@ -266,32 +257,7 @@ public abstract class RpcServer implements RpcServerInterface,
final List<BlockingServiceAndInterface> services,
final InetSocketAddress bindAddress, Configuration conf,
RpcScheduler scheduler, boolean reservoirEnabled) throws IOException {
if (reservoirEnabled) {
int poolBufSize = conf.getInt(ByteBufferPool.BUFFER_SIZE_KEY,
ByteBufferPool.DEFAULT_BUFFER_SIZE);
// The max number of buffers to be pooled in the ByteBufferPool. The default value been
// selected based on the #handlers configured. When it is read request, 2 MB is the max size
// at which we will send back one RPC request. Means max we need 2 MB for creating the
// response cell block. (Well it might be much lesser than this because in 2 MB size calc, we
// include the heap size overhead of each cells also.) Considering 2 MB, we will need
// (2 * 1024 * 1024) / poolBufSize buffers to make the response cell block. Pool buffer size
// is by default 64 KB.
// In case of read request, at the end of the handler process, we will make the response
// cellblock and add the Call to connection's response Q and a single Responder thread takes
// connections and responses from that one by one and do the socket write. So there is chances
// that by the time a handler originated response is actually done writing to socket and so
// released the BBs it used, the handler might have processed one more read req. On an avg 2x
// we consider and consider that also for the max buffers to pool
int bufsForTwoMB = (2 * 1024 * 1024) / poolBufSize;
int maxPoolSize = conf.getInt(ByteBufferPool.MAX_POOL_SIZE_KEY,
conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * bufsForTwoMB * 2);
this.reservoir = new ByteBufferPool(poolBufSize, maxPoolSize);
this.minSizeForReservoirUse = getMinSizeForReservoirUse(this.reservoir);
} else {
reservoir = null;
this.minSizeForReservoirUse = Integer.MAX_VALUE;// reservoir itself not in place.
}
this.bbAllocator = ByteBuffAllocator.create(conf, reservoirEnabled);
this.server = server;
this.services = services;
this.bindAddress = bindAddress;
@ -325,11 +291,6 @@ public abstract class RpcServer implements RpcServerInterface,
this.scheduler = scheduler;
}
@VisibleForTesting
static int getMinSizeForReservoirUse(ByteBufferPool pool) {
return pool.getBufferSize() / 6;
}
@Override
public void onConfigurationChange(Configuration newConf) {
initReconfigurable(newConf);
@ -651,55 +612,6 @@ public abstract class RpcServer implements RpcServerInterface,
return (nBytes > 0) ? nBytes : ret;
}
/**
* This is extracted to a static method for better unit testing. We try to get buffer(s) from pool
* as much as possible.
*
* @param pool The ByteBufferPool to use
* @param minSizeForPoolUse Only for buffer size above this, we will try to use pool. Any buffer
* need of size below this, create on heap ByteBuffer.
* @param reqLen Bytes count in request
*/
@VisibleForTesting
static Pair<ByteBuff, CallCleanup> allocateByteBuffToReadInto(ByteBufferPool pool,
int minSizeForPoolUse, int reqLen) {
ByteBuff resultBuf;
List<ByteBuffer> bbs = new ArrayList<>((reqLen / pool.getBufferSize()) + 1);
int remain = reqLen;
ByteBuffer buf = null;
while (remain >= minSizeForPoolUse && (buf = pool.getBuffer()) != null) {
bbs.add(buf);
remain -= pool.getBufferSize();
}
ByteBuffer[] bufsFromPool = null;
if (bbs.size() > 0) {
bufsFromPool = new ByteBuffer[bbs.size()];
bbs.toArray(bufsFromPool);
}
if (remain > 0) {
bbs.add(ByteBuffer.allocate(remain));
}
if (bbs.size() > 1) {
ByteBuffer[] items = new ByteBuffer[bbs.size()];
bbs.toArray(items);
resultBuf = new MultiByteBuff(items);
} else {
// We are backed by single BB
resultBuf = new SingleByteBuff(bbs.get(0));
}
resultBuf.limit(reqLen);
if (bufsFromPool != null) {
final ByteBuffer[] bufsFromPoolFinal = bufsFromPool;
return new Pair<>(resultBuf, () -> {
// Return back all the BBs to pool
for (int i = 0; i < bufsFromPoolFinal.length; i++) {
pool.putbackBuffer(bufsFromPoolFinal[i]);
}
});
}
return new Pair<>(resultBuf, null);
}
/**
* Needed for features such as delayed calls. We need to be able to store the current call
* so that we can complete it later or ask questions of what is supported by the current ongoing

View File

@ -26,10 +26,10 @@ import java.util.Optional;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
import org.apache.hadoop.hbase.io.ByteBufferPool;
import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
import org.apache.hadoop.hbase.security.User;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
@ -67,7 +67,7 @@ abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, Rpc
protected long startTime;
protected final long deadline;// the deadline to handle this call, if exceed we can drop it.
protected final ByteBufferPool reservoir;
protected final ByteBuffAllocator bbAllocator;
protected final CellBlockBuilder cellBlockBuilder;
@ -91,11 +91,11 @@ abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, Rpc
private long exceptionSize = 0;
private final boolean retryImmediatelySupported;
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
justification="Can't figure why this complaint is happening... see below")
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH",
justification = "Can't figure why this complaint is happening... see below")
ServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header,
Message param, CellScanner cellScanner, T connection, long size,
InetAddress remoteAddress, long receiveTime, int timeout, ByteBufferPool reservoir,
Message param, CellScanner cellScanner, T connection, long size, InetAddress remoteAddress,
long receiveTime, int timeout, ByteBuffAllocator byteBuffAllocator,
CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup) {
this.id = id;
this.service = service;
@ -118,7 +118,7 @@ abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, Rpc
this.remoteAddress = remoteAddress;
this.timeout = timeout;
this.deadline = this.timeout > 0 ? this.receiveTime + this.timeout : Long.MAX_VALUE;
this.reservoir = reservoir;
this.bbAllocator = byteBuffAllocator;
this.cellBlockBuilder = cellBlockBuilder;
this.reqCleanup = reqCleanup;
}
@ -199,9 +199,9 @@ abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, Rpc
// high when we can avoid a big buffer allocation on each rpc.
List<ByteBuffer> cellBlock = null;
int cellBlockSize = 0;
if (this.reservoir != null) {
if (bbAllocator.isReservoirEnabled()) {
this.cellBlockStream = this.cellBlockBuilder.buildCellBlockStream(this.connection.codec,
this.connection.compressionCodec, cells, this.reservoir);
this.connection.compressionCodec, cells, bbAllocator);
if (this.cellBlockStream != null) {
cellBlock = this.cellBlockStream.getByteBuffers();
cellBlockSize = this.cellBlockStream.size();

View File

@ -488,7 +488,7 @@ public class SimpleRpcServer extends RpcServer {
Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status,
long startTime, int timeout) throws IOException {
SimpleServerCall fakeCall = new SimpleServerCall(-1, service, md, null, param, cellScanner,
null, -1, null, receiveTime, timeout, reservoir, cellBlockBuilder, null, null);
null, -1, null, receiveTime, timeout, bbAllocator, cellBlockBuilder, null, null);
return call(fakeCall, status);
}

View File

@ -21,9 +21,9 @@ import java.io.IOException;
import java.net.InetAddress;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.io.ByteBufferPool;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
@ -42,11 +42,12 @@ class SimpleServerCall extends ServerCall<SimpleServerRpcConnection> {
justification = "Can't figure why this complaint is happening... see below")
SimpleServerCall(int id, final BlockingService service, final MethodDescriptor md,
RequestHeader header, Message param, CellScanner cellScanner,
SimpleServerRpcConnection connection, long size,
final InetAddress remoteAddress, long receiveTime, int timeout, ByteBufferPool reservoir,
CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup, SimpleRpcServerResponder responder) {
super(id, service, md, header, param, cellScanner, connection, size, remoteAddress,
receiveTime, timeout, reservoir, cellBlockBuilder, reqCleanup);
SimpleServerRpcConnection connection, long size, final InetAddress remoteAddress,
long receiveTime, int timeout, ByteBuffAllocator bbAllocator,
CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup,
SimpleRpcServerResponder responder) {
super(id, service, md, header, param, cellScanner, connection, size, remoteAddress, receiveTime,
timeout, bbAllocator, cellBlockBuilder, reqCleanup);
this.responder = responder;
}

View File

@ -36,14 +36,12 @@ import org.apache.hadoop.hbase.client.VersionInfoUtil;
import org.apache.hadoop.hbase.exceptions.RequestTooBigException;
import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.util.Pair;
/** Reads calls from a connection and queues them for handling. */
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "VO_VOLATILE_INCREMENT",
@ -212,7 +210,7 @@ class SimpleServerRpcConnection extends ServerRpcConnection {
// Notify the client about the offending request
SimpleServerCall reqTooBig = new SimpleServerCall(header.getCallId(), this.service, null,
null, null, null, this, 0, this.addr, System.currentTimeMillis(), 0,
this.rpcServer.reservoir, this.rpcServer.cellBlockBuilder, null, responder);
this.rpcServer.bbAllocator, this.rpcServer.cellBlockBuilder, null, responder);
this.rpcServer.metrics.exception(SimpleRpcServer.REQUEST_TOO_BIG_EXCEPTION);
// Make sure the client recognizes the underlying exception
// Otherwise, throw a DoNotRetryIOException.
@ -255,24 +253,8 @@ class SimpleServerRpcConnection extends ServerRpcConnection {
// It creates the ByteBuff and CallCleanup and assign to Connection instance.
private void initByteBuffToReadInto(int length) {
// We create random on heap buffers are read into those when
// 1. ByteBufferPool is not there.
// 2. When the size of the req is very small. Using a large sized (64 KB) buffer from pool is
// waste then. Also if all the reqs are of this size, we will be creating larger sized
// buffers and pool them permanently. This include Scan/Get request and DDL kind of reqs like
// RegionOpen.
// 3. If it is an initial handshake signal or initial connection request. Any way then
// condition 2 itself will match
// 4. When SASL use is ON.
if (this.rpcServer.reservoir == null || skipInitialSaslHandshake || !connectionHeaderRead ||
useSasl || length < this.rpcServer.minSizeForReservoirUse) {
this.data = new SingleByteBuff(ByteBuffer.allocate(length));
} else {
Pair<ByteBuff, CallCleanup> pair = RpcServer.allocateByteBuffToReadInto(
this.rpcServer.reservoir, this.rpcServer.minSizeForReservoirUse, length);
this.data = pair.getFirst();
this.callCleanup = pair.getSecond();
}
this.data = rpcServer.bbAllocator.allocate(length);
this.callCleanup = data::release;
}
protected int channelDataRead(ReadableByteChannel channel, ByteBuff buf) throws IOException {
@ -345,7 +327,7 @@ class SimpleServerRpcConnection extends ServerRpcConnection {
RequestHeader header, Message param, CellScanner cellScanner, long size,
InetAddress remoteAddress, int timeout, CallCleanup reqCleanup) {
return new SimpleServerCall(id, service, md, header, param, cellScanner, this, size,
remoteAddress, System.currentTimeMillis(), timeout, this.rpcServer.reservoir,
remoteAddress, System.currentTimeMillis(), timeout, this.rpcServer.bbAllocator,
this.rpcServer.cellBlockBuilder, reqCleanup, this.responder);
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT;
import static org.apache.hadoop.hbase.io.ByteBuffAllocator.MAX_BUFFER_COUNT_KEY;
import static org.apache.hadoop.hbase.master.LoadBalancer.TABLES_ON_MASTER;
import static org.junit.Assert.assertEquals;
@ -42,7 +43,6 @@ import org.apache.hadoop.hbase.MemoryCompactionPolicy;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.io.ByteBufferPool;
import org.apache.hadoop.hbase.regionserver.CompactingMemStore;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.testclassification.ClientTests;
@ -94,7 +94,7 @@ public class TestAsyncTableGetMultiThreaded {
protected static void setUp(MemoryCompactionPolicy memoryCompaction) throws Exception {
TEST_UTIL.getConfiguration().set(TABLES_ON_MASTER, "none");
TEST_UTIL.getConfiguration().setLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, 60000L);
TEST_UTIL.getConfiguration().setInt(ByteBufferPool.MAX_POOL_SIZE_KEY, 100);
TEST_UTIL.getConfiguration().setInt(MAX_BUFFER_COUNT_KEY, 100);
TEST_UTIL.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
String.valueOf(memoryCompaction));

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.io.ByteBuffAllocator.BUFFER_SIZE_KEY;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@ -26,7 +28,6 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.io.ByteBufferPool;
import org.apache.hadoop.hbase.ipc.NettyRpcServer;
import org.apache.hadoop.hbase.ipc.RpcServerFactory;
import org.apache.hadoop.hbase.ipc.SimpleRpcServer;
@ -71,9 +72,8 @@ public class TestServerLoadDurability {
private static Configuration createConfigurationForSimpleRpcServer() {
Configuration conf = HBaseConfiguration.create();
conf.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY,
SimpleRpcServer.class.getName());
conf.setInt(ByteBufferPool.BUFFER_SIZE_KEY, 20);
conf.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, SimpleRpcServer.class.getName());
conf.setInt(BUFFER_SIZE_KEY, 20);
return conf;
}

View File

@ -832,7 +832,7 @@ public class TestHFileBlock {
if (ClassSize.is32BitJVM()) {
assertEquals(64, HFileBlock.MULTI_BYTE_BUFFER_HEAP_SIZE);
} else {
assertEquals(72, HFileBlock.MULTI_BYTE_BUFFER_HEAP_SIZE);
assertEquals(80, HFileBlock.MULTI_BYTE_BUFFER_HEAP_SIZE);
}
for (int size : new int[] { 100, 256, 12345 }) {

View File

@ -1,144 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.io.ByteBufferPool;
import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.MultiByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.testclassification.RPCTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ RPCTests.class, SmallTests.class })
public class TestRpcServer {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRpcServer.class);
@Test
public void testAllocateByteBuffToReadInto() throws Exception {
int maxBuffersInPool = 10;
ByteBufferPool pool = new ByteBufferPool(6 * 1024, maxBuffersInPool);
initPoolWithAllBuffers(pool, maxBuffersInPool);
ByteBuff buff = null;
Pair<ByteBuff, CallCleanup> pair;
// When the request size is less than 1/6th of the pool buffer size. We should use on demand
// created on heap Buffer
pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool),
200);
buff = pair.getFirst();
assertTrue(buff.hasArray());
assertEquals(maxBuffersInPool, pool.getQueueSize());
assertNull(pair.getSecond());
// When the request size is > 1/6th of the pool buffer size.
pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool),
1024);
buff = pair.getFirst();
assertFalse(buff.hasArray());
assertEquals(maxBuffersInPool - 1, pool.getQueueSize());
assertNotNull(pair.getSecond());
pair.getSecond().run();// CallCleanup#run should put back the BB to pool.
assertEquals(maxBuffersInPool, pool.getQueueSize());
// Request size> pool buffer size
pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool),
7 * 1024);
buff = pair.getFirst();
assertFalse(buff.hasArray());
assertTrue(buff instanceof MultiByteBuff);
ByteBuffer[] bbs = ((MultiByteBuff) buff).getEnclosingByteBuffers();
assertEquals(2, bbs.length);
assertTrue(bbs[0].isDirect());
assertTrue(bbs[1].isDirect());
assertEquals(6 * 1024, bbs[0].limit());
assertEquals(1024, bbs[1].limit());
assertEquals(maxBuffersInPool - 2, pool.getQueueSize());
assertNotNull(pair.getSecond());
pair.getSecond().run();// CallCleanup#run should put back the BB to pool.
assertEquals(maxBuffersInPool, pool.getQueueSize());
pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool),
6 * 1024 + 200);
buff = pair.getFirst();
assertFalse(buff.hasArray());
assertTrue(buff instanceof MultiByteBuff);
bbs = ((MultiByteBuff) buff).getEnclosingByteBuffers();
assertEquals(2, bbs.length);
assertTrue(bbs[0].isDirect());
assertFalse(bbs[1].isDirect());
assertEquals(6 * 1024, bbs[0].limit());
assertEquals(200, bbs[1].limit());
assertEquals(maxBuffersInPool - 1, pool.getQueueSize());
assertNotNull(pair.getSecond());
pair.getSecond().run();// CallCleanup#run should put back the BB to pool.
assertEquals(maxBuffersInPool, pool.getQueueSize());
ByteBuffer[] buffers = new ByteBuffer[maxBuffersInPool - 1];
for (int i = 0; i < maxBuffersInPool - 1; i++) {
buffers[i] = pool.getBuffer();
}
pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool),
20 * 1024);
buff = pair.getFirst();
assertFalse(buff.hasArray());
assertTrue(buff instanceof MultiByteBuff);
bbs = ((MultiByteBuff) buff).getEnclosingByteBuffers();
assertEquals(2, bbs.length);
assertTrue(bbs[0].isDirect());
assertFalse(bbs[1].isDirect());
assertEquals(6 * 1024, bbs[0].limit());
assertEquals(14 * 1024, bbs[1].limit());
assertEquals(0, pool.getQueueSize());
assertNotNull(pair.getSecond());
pair.getSecond().run();// CallCleanup#run should put back the BB to pool.
assertEquals(1, pool.getQueueSize());
pool.getBuffer();
pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool),
7 * 1024);
buff = pair.getFirst();
assertTrue(buff.hasArray());
assertTrue(buff instanceof SingleByteBuff);
assertEquals(7 * 1024, ((SingleByteBuff) buff).getEnclosingByteBuffer().limit());
assertNull(pair.getSecond());
}
private void initPoolWithAllBuffers(ByteBufferPool pool, int maxBuffersInPool) {
ByteBuffer[] buffers = new ByteBuffer[maxBuffersInPool];
// Just call getBuffer() on pool 'maxBuffersInPool' so as to init all buffers and then put back
// all. Makes pool with max #buffers.
for (int i = 0; i < maxBuffersInPool; i++) {
buffers[i] = pool.getBuffer();
}
for (ByteBuffer buf : buffers) {
pool.putbackBuffer(buf);
}
}
}