HBASE-13142 [PERF] Reuse the IPCUtil#buildCellBlock buffer

Rename ByteBufferReservoir as BoundedByteBufferPool
This commit is contained in:
stack 2015-03-04 22:44:20 -08:00
parent 5bd27af8b0
commit 55f8f56ad2
6 changed files with 356 additions and 28 deletions

View File

@ -65,6 +65,7 @@ public class IPCUtil {
this.conf = conf; this.conf = conf;
this.cellBlockDecompressionMultiplier = this.cellBlockDecompressionMultiplier =
conf.getInt("hbase.ipc.cellblock.decompression.buffersize.multiplier", 3); conf.getInt("hbase.ipc.cellblock.decompression.buffersize.multiplier", 3);
// Guess that 16k is a good size for rpc buffer. Could go bigger. See the TODO below in // Guess that 16k is a good size for rpc buffer. Could go bigger. See the TODO below in
// #buildCellBlock. // #buildCellBlock.
this.cellBlockBuildingInitialBufferSize = this.cellBlockBuildingInitialBufferSize =
@ -90,24 +91,45 @@ public class IPCUtil {
@SuppressWarnings("resource") @SuppressWarnings("resource")
public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor, public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
final CellScanner cellScanner) final CellScanner cellScanner)
throws IOException {
return buildCellBlock(codec, compressor, cellScanner, null);
}
/**
* Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or
* <code>compressor</code>.
* @param codec
* @param compressor
* @param cellScanner
* @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using
* passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has been
* flipped and is ready for reading. Use limit to find total size.
* @param bb Use this bb. Can be null if no reuse going on.
* @throws IOException
*/
@SuppressWarnings("resource")
public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
final CellScanner cellScanner, final ByteBuffer bb)
throws IOException { throws IOException {
if (cellScanner == null) return null; if (cellScanner == null) return null;
if (codec == null) throw new CellScannerButNoCodecException(); if (codec == null) throw new CellScannerButNoCodecException();
int bufferSize = this.cellBlockBuildingInitialBufferSize; int bufferSize = this.cellBlockBuildingInitialBufferSize;
if (cellScanner instanceof HeapSize) { ByteBufferOutputStream baos = null;
long longSize = ((HeapSize)cellScanner).heapSize(); if (bb != null) {
// Just make sure we don't have a size bigger than an int. bufferSize = bb.capacity();
if (longSize > Integer.MAX_VALUE) { baos = new ByteBufferOutputStream(bb);
throw new IOException("Size " + longSize + " > " + Integer.MAX_VALUE); } else {
// Then we need to make our own to return.
if (cellScanner instanceof HeapSize) {
long longSize = ((HeapSize)cellScanner).heapSize();
// Just make sure we don't have a size bigger than an int.
if (longSize > Integer.MAX_VALUE) {
throw new IOException("Size " + longSize + " > " + Integer.MAX_VALUE);
}
bufferSize = ClassSize.align((int)longSize);
} }
bufferSize = ClassSize.align((int)longSize); baos = new ByteBufferOutputStream(bufferSize);
} // TODO: Else, get estimate on size of buffer rather than have the buffer resize. }
// See TestIPCUtil main for experiment where we spin through the Cells getting estimate of
// total size before creating the buffer. It costs somw small percentage. If we are usually
// within the estimated buffer size, then the cost is not worth it. If we are often well
// outside the guesstimated buffer size, the processing can be done in half the time if we
// go w/ the estimated size rather than let the buffer resize.
ByteBufferOutputStream baos = new ByteBufferOutputStream(bufferSize);
OutputStream os = baos; OutputStream os = baos;
Compressor poolCompressor = null; Compressor poolCompressor = null;
try { try {

View File

@ -0,0 +1,118 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io;
import java.nio.ByteBuffer;
import java.util.NavigableMap;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import com.google.common.annotations.VisibleForTesting;
/**
* Like Hadoops' ByteBufferPool only you do not specify desired size when getting a ByteBuffer. We
* also keep upper bounds on ByteBuffer size and amount of ByteBuffers we keep int the pool hence
* it is 'bounded' as opposed to 'elastic' as in ElasticByteBuffferPool If a ByteBuffer is bigger
* than a threshold, we will just let the ByteBuffer go rather than keep it around. If more
* ByteBuffers than configured maximum instances, then we do not cache either (we will log a
* WARN in this case).
*
* <p>The intended use case is a reservoir of bytebuffers that an RPC can reuse; buffers tend to
* achieve a particular 'run' size over time give or take a few extremes.
*
* <p>Thread safe.
*/
@InterfaceAudience.Private
public class BoundedByteBufferPool {
private final Log LOG = LogFactory.getLog(this.getClass());
private final class Key implements Comparable<Key> {
private final int capacity;
Key(final int capacity) {
this.capacity = capacity;
}
@Override
public int compareTo(Key that) {
if (this.capacity < that.capacity) return -1;
if (this.capacity > that.capacity) return 1;
return this.hashCode() - that.hashCode();
}
}
@VisibleForTesting
final NavigableMap<Key, ByteBuffer> buffers = new TreeMap<Key, ByteBuffer>();
private final int maxByteBufferSizeToCache;
private final int maxToCache;
// A running average only it just rises, never recedes
private int runningAverage;
private int totalReservoirCapacity;
/**
* @param maxByteBufferSizeToCache
* @param initialByteBufferSize
* @param maxToCache
*/
public BoundedByteBufferPool(final int maxByteBufferSizeToCache, final int initialByteBufferSize,
final int maxToCache) {
this.maxByteBufferSizeToCache = maxByteBufferSizeToCache;
this.runningAverage = initialByteBufferSize;
this.maxToCache = maxToCache;
}
public synchronized ByteBuffer getBuffer() {
Key key = this.buffers.isEmpty()? null: this.buffers.firstKey();
ByteBuffer bb = null;
if (key == null) {
bb = ByteBuffer.allocate(this.runningAverage);
} else {
bb = this.buffers.remove(key);
if (bb == null) throw new IllegalStateException();
bb.clear();
this.totalReservoirCapacity -= bb.capacity();
}
if (LOG.isTraceEnabled()) {
LOG.trace("runningAverage=" + this.runningAverage +
", totalCapacity=" + this.totalReservoirCapacity + ", count=" + this.buffers.size());
}
return bb;
}
public synchronized void putBuffer(ByteBuffer buffer) {
// If buffer is larger than we want to keep around, just let it go.
if (buffer.capacity() > this.maxByteBufferSizeToCache) return;
// futureSize is how many byte buffers the reservoir will have if this method succeeds.
int futureSize = this.buffers.size() + 1;
if (futureSize > this.maxToCache) {
// If at max size, something is wrong. WARN.
if (LOG.isWarnEnabled()) LOG.warn("At capacity: " + futureSize);
return;
}
this.totalReservoirCapacity += buffer.capacity();
int average = this.totalReservoirCapacity / futureSize;
if (average > this.runningAverage && average < this.maxByteBufferSizeToCache) {
this.runningAverage = average;
}
this.buffers.put(new Key(buffer.capacity()), buffer);
}
}

View File

@ -43,17 +43,32 @@ public class ByteBufferOutputStream extends OutputStream {
} }
public ByteBufferOutputStream(int capacity, boolean useDirectByteBuffer) { public ByteBufferOutputStream(int capacity, boolean useDirectByteBuffer) {
if (useDirectByteBuffer) { this(allocate(capacity, useDirectByteBuffer));
buf = ByteBuffer.allocateDirect(capacity); }
} else {
buf = ByteBuffer.allocate(capacity); /**
} * @param bb ByteBuffer to use. If too small, will be discarded and a new one allocated in its
* place; i.e. the passed in BB may be DESTROYED!!! Minimally it will be altered. If you want
* to obtain the newly allocated ByteBuffer, you'll need to pick it up when
* done with this instance by calling {@link #getByteBuffer()}. All this encapsulation violation
* is so we can recycle buffers rather than allocate each time; it can get expensive especially
* if the buffers are big doing allocations each time or having them undergo resizing because
* initial allocation was small.
* @see #getByteBuffer()
*/
public ByteBufferOutputStream(final ByteBuffer bb) {
this.buf = bb;
this.buf.clear();
} }
public int size() { public int size() {
return buf.position(); return buf.position();
} }
private static ByteBuffer allocate(final int capacity, final boolean useDirectByteBuffer) {
return useDirectByteBuffer? ByteBuffer.allocateDirect(capacity): ByteBuffer.allocate(capacity);
}
/** /**
* This flips the underlying BB so be sure to use it _last_! * This flips the underlying BB so be sure to use it _last_!
* @return ByteBuffer * @return ByteBuffer
@ -70,12 +85,7 @@ public class ByteBufferOutputStream extends OutputStream {
int newSize = (int)Math.min((((long)buf.capacity()) * 2), int newSize = (int)Math.min((((long)buf.capacity()) * 2),
(long)(Integer.MAX_VALUE)); (long)(Integer.MAX_VALUE));
newSize = Math.max(newSize, buf.position() + extra); newSize = Math.max(newSize, buf.position() + extra);
ByteBuffer newBuf = null; ByteBuffer newBuf = allocate(newSize, buf.isDirect());
if (buf.isDirect()) {
newBuf = ByteBuffer.allocateDirect(newSize);
} else {
newBuf = ByteBuffer.allocate(newSize);
}
buf.flip(); buf.flip();
newBuf.put(buf); newBuf.put(buf);
buf = newBuf; buf = newBuf;

View File

@ -0,0 +1,107 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io;
import static org.junit.Assert.*;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ IOTests.class, SmallTests.class })
public class TestByteBufferResevoir {
final int maxByteBufferSizeToCache = 10;
final int initialByteBufferSize = 1;
final int maxToCache = 10;
BoundedByteBufferPool reservoir;
@Before
public void before() {
this.reservoir =
new BoundedByteBufferPool(maxByteBufferSizeToCache, initialByteBufferSize, maxToCache);
}
@After
public void after() {
this.reservoir = null;
}
@Test
public void testGetPut() {
ByteBuffer bb = this.reservoir.getBuffer();
assertEquals(initialByteBufferSize, bb.capacity());
assertEquals(0, this.reservoir.buffers.size());
this.reservoir.putBuffer(bb);
assertEquals(1, this.reservoir.buffers.size());
// Now remove a buffer and don't put it back so reservoir is empty.
this.reservoir.getBuffer();
assertEquals(0, this.reservoir.buffers.size());
// Try adding in a buffer with a bigger-than-initial size and see if our runningAverage works.
// Need to add then remove, then get a new bytebuffer so reservoir internally is doing
// allocation
final int newCapacity = 2;
this.reservoir.putBuffer(ByteBuffer.allocate(newCapacity));
assertEquals(1, reservoir.buffers.size());
this.reservoir.getBuffer();
assertEquals(0, this.reservoir.buffers.size());
bb = this.reservoir.getBuffer();
assertEquals(newCapacity, bb.capacity());
// Assert that adding a too-big buffer won't happen
assertEquals(0, this.reservoir.buffers.size());
this.reservoir.putBuffer(ByteBuffer.allocate(maxByteBufferSizeToCache * 2));
assertEquals(0, this.reservoir.buffers.size());
// Assert we can't add more than max allowed instances.
for (int i = 0; i < maxToCache; i++) {
this.reservoir.putBuffer(ByteBuffer.allocate(initialByteBufferSize));
}
assertEquals(maxToCache, this.reservoir.buffers.size());
}
@Test
public void testComesOutSmallestFirst() {
// Put in bbs that are sized 1-5 in random order. Put in a few of size 2 and make sure they
// each come out too.
this.reservoir.putBuffer(ByteBuffer.allocate(5));
assertEquals(1, this.reservoir.buffers.size());
this.reservoir.putBuffer(ByteBuffer.allocate(2));
assertEquals(2, this.reservoir.buffers.size());
this.reservoir.putBuffer(ByteBuffer.allocate(2));
assertEquals(3, this.reservoir.buffers.size());
this.reservoir.putBuffer(ByteBuffer.allocate(3));
assertEquals(4, this.reservoir.buffers.size());
this.reservoir.putBuffer(ByteBuffer.allocate(1));
assertEquals(5, this.reservoir.buffers.size());
this.reservoir.putBuffer(ByteBuffer.allocate(2));
assertEquals(6, this.reservoir.buffers.size());
this.reservoir.putBuffer(ByteBuffer.allocate(4));
assertEquals(7, this.reservoir.buffers.size());
// Now get them out and they should come out smallest first.
assertEquals(1, this.reservoir.getBuffer().capacity());
assertEquals(2, this.reservoir.getBuffer().capacity());
assertEquals(2, this.reservoir.getBuffer().capacity());
assertEquals(2, this.reservoir.getBuffer().capacity());
assertEquals(3, this.reservoir.getBuffer().capacity());
assertEquals(4, this.reservoir.getBuffer().capacity());
assertEquals(5, this.reservoir.getBuffer().capacity());
}
}

View File

@ -82,6 +82,7 @@ import org.apache.hadoop.hbase.client.Operation;
import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.io.ByteBufferOutputStream; import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
@ -267,6 +268,9 @@ public class RpcServer implements RpcServerInterface {
private UserProvider userProvider; private UserProvider userProvider;
private final BoundedByteBufferPool reservoir;
/** /**
* Datastructure that holds all necessary to a method invocation and then afterward, carries * Datastructure that holds all necessary to a method invocation and then afterward, carries
* the result. * the result.
@ -293,6 +297,7 @@ public class RpcServer implements RpcServerInterface {
protected long size; // size of current call protected long size; // size of current call
protected boolean isError; protected boolean isError;
protected TraceInfo tinfo; protected TraceInfo tinfo;
private ByteBuffer recycledByteBuffer = null;
Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header, Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header,
Message param, CellScanner cellScanner, Connection connection, Responder responder, Message param, CellScanner cellScanner, Connection connection, Responder responder,
@ -313,6 +318,19 @@ public class RpcServer implements RpcServerInterface {
this.tinfo = tinfo; this.tinfo = tinfo;
} }
/**
* Call is done. Execution happened and we returned results to client. It is now safe to
* cleanup.
*/
void done() {
if (this.recycledByteBuffer != null) {
// Return buffer to reservoir now we are done with it.
reservoir.putBuffer(this.recycledByteBuffer);
this.recycledByteBuffer = null;
}
this.connection.decRpcCount(); // Say that we're done with this call.
}
@Override @Override
public String toString() { public String toString() {
return toShortString() + " param: " + return toShortString() + " param: " +
@ -375,8 +393,9 @@ public class RpcServer implements RpcServerInterface {
// Set the exception as the result of the method invocation. // Set the exception as the result of the method invocation.
headerBuilder.setException(exceptionBuilder.build()); headerBuilder.setException(exceptionBuilder.build());
} }
ByteBuffer cellBlock = this.recycledByteBuffer = reservoir.getBuffer();
ipcUtil.buildCellBlock(this.connection.codec, this.connection.compressionCodec, cells); ByteBuffer cellBlock = ipcUtil.buildCellBlock(this.connection.codec,
this.connection.compressionCodec, cells, recycledByteBuffer);
if (cellBlock != null) { if (cellBlock != null) {
CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder(); CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
// Presumes the cellBlock bytebuffer has been flipped so limit has total size in it. // Presumes the cellBlock bytebuffer has been flipped so limit has total size in it.
@ -1051,7 +1070,7 @@ public class RpcServer implements RpcServerInterface {
} }
if (!call.response.hasRemaining()) { if (!call.response.hasRemaining()) {
call.connection.decRpcCount(); // Say that we're done with this call. call.done();
return true; return true;
} else { } else {
return false; // Socket can't take more, we will have to come back. return false; // Socket can't take more, we will have to come back.
@ -1885,7 +1904,13 @@ public class RpcServer implements RpcServerInterface {
final InetSocketAddress bindAddress, Configuration conf, final InetSocketAddress bindAddress, Configuration conf,
RpcScheduler scheduler) RpcScheduler scheduler)
throws IOException { throws IOException {
this.reservoir = new BoundedByteBufferPool(
conf.getInt("hbase.ipc.server.reservoir.max.buffer.size", 1024 * 1024),
conf.getInt("hbase.ipc.server.reservoir.initial.buffer.size", 16 * 1024),
// Make the max twice the number of handlers to be safe.
conf.getInt("hbase.ipc.server.reservoir.initial.max",
conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * 2));
this.server = server; this.server = server;
this.services = services; this.services = services;
this.bindAddress = bindAddress; this.bindAddress = bindAddress;

View File

@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io;
import static org.junit.Assert.*;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
public class TestByteBufferOutputStream {
@Test
public void testByteBufferReuse() throws IOException {
Bytes.toBytes("some bytes");
ByteBuffer bb = ByteBuffer.allocate(16);
ByteBuffer bbToReuse = write(bb, Bytes.toBytes("some bytes"));
bbToReuse = write(bbToReuse, Bytes.toBytes("less"));
assertTrue(bb == bbToReuse);
}
private ByteBuffer write(final ByteBuffer bb, final byte [] bytes) throws IOException {
try (ByteBufferOutputStream bbos = new ByteBufferOutputStream(bb)) {
bbos.write(bytes);
assertTrue(Bytes.compareTo(bytes, bbos.toByteArray(0, bytes.length)) == 0);
bbos.flush();
return bbos.getByteBuffer();
}
}
}