From 55f8f56ad28f71a893acb1e5993689499134a018 Mon Sep 17 00:00:00 2001 From: stack Date: Wed, 4 Mar 2015 22:44:20 -0800 Subject: [PATCH] HBASE-13142 [PERF] Reuse the IPCUtil#buildCellBlock buffer Rename ByteBufferReservoir as BoundedByteBufferPool --- .../org/apache/hadoop/hbase/ipc/IPCUtil.java | 48 +++++-- .../hbase/io/BoundedByteBufferPool.java | 118 ++++++++++++++++++ .../hbase/io/ByteBufferOutputStream.java | 32 +++-- .../hbase/io/TestByteBufferResevoir.java | 107 ++++++++++++++++ .../apache/hadoop/hbase/ipc/RpcServer.java | 33 ++++- .../hbase/io/TestByteBufferOutputStream.java | 46 +++++++ 6 files changed, 356 insertions(+), 28 deletions(-) create mode 100644 hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java create mode 100644 hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferResevoir.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferOutputStream.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java index b7e7728fece..63c2143b2ba 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java @@ -65,6 +65,7 @@ public class IPCUtil { this.conf = conf; this.cellBlockDecompressionMultiplier = conf.getInt("hbase.ipc.cellblock.decompression.buffersize.multiplier", 3); + // Guess that 16k is a good size for rpc buffer. Could go bigger. See the TODO below in // #buildCellBlock. this.cellBlockBuildingInitialBufferSize = @@ -90,24 +91,45 @@ public class IPCUtil { @SuppressWarnings("resource") public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor, final CellScanner cellScanner) + throws IOException { + return buildCellBlock(codec, compressor, cellScanner, null); + } + + /** + * Puts CellScanner Cells into a cell block using passed in codec and/or + * compressor. + * @param codec + * @param compressor + * @param cellScanner + * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using + * passed in codec and/or compressor; the returned buffer has been + * flipped and is ready for reading. Use limit to find total size. + * @param bb Use this bb. Can be null if no reuse going on. + * @throws IOException + */ + @SuppressWarnings("resource") + public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor, + final CellScanner cellScanner, final ByteBuffer bb) throws IOException { if (cellScanner == null) return null; if (codec == null) throw new CellScannerButNoCodecException(); int bufferSize = this.cellBlockBuildingInitialBufferSize; - if (cellScanner instanceof HeapSize) { - long longSize = ((HeapSize)cellScanner).heapSize(); - // Just make sure we don't have a size bigger than an int. - if (longSize > Integer.MAX_VALUE) { - throw new IOException("Size " + longSize + " > " + Integer.MAX_VALUE); + ByteBufferOutputStream baos = null; + if (bb != null) { + bufferSize = bb.capacity(); + baos = new ByteBufferOutputStream(bb); + } else { + // Then we need to make our own to return. + if (cellScanner instanceof HeapSize) { + long longSize = ((HeapSize)cellScanner).heapSize(); + // Just make sure we don't have a size bigger than an int. + if (longSize > Integer.MAX_VALUE) { + throw new IOException("Size " + longSize + " > " + Integer.MAX_VALUE); + } + bufferSize = ClassSize.align((int)longSize); } - bufferSize = ClassSize.align((int)longSize); - } // TODO: Else, get estimate on size of buffer rather than have the buffer resize. - // See TestIPCUtil main for experiment where we spin through the Cells getting estimate of - // total size before creating the buffer. It costs somw small percentage. If we are usually - // within the estimated buffer size, then the cost is not worth it. If we are often well - // outside the guesstimated buffer size, the processing can be done in half the time if we - // go w/ the estimated size rather than let the buffer resize. - ByteBufferOutputStream baos = new ByteBufferOutputStream(bufferSize); + baos = new ByteBufferOutputStream(bufferSize); + } OutputStream os = baos; Compressor poolCompressor = null; try { diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java new file mode 100644 index 00000000000..1ed7db0c59a --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io; + +import java.nio.ByteBuffer; +import java.util.NavigableMap; +import java.util.TreeMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Like Hadoops' ByteBufferPool only you do not specify desired size when getting a ByteBuffer. We + * also keep upper bounds on ByteBuffer size and amount of ByteBuffers we keep int the pool hence + * it is 'bounded' as opposed to 'elastic' as in ElasticByteBuffferPool If a ByteBuffer is bigger + * than a threshold, we will just let the ByteBuffer go rather than keep it around. If more + * ByteBuffers than configured maximum instances, then we do not cache either (we will log a + * WARN in this case). + * + *

The intended use case is a reservoir of bytebuffers that an RPC can reuse; buffers tend to + * achieve a particular 'run' size over time give or take a few extremes. + * + *

Thread safe. + */ +@InterfaceAudience.Private +public class BoundedByteBufferPool { + private final Log LOG = LogFactory.getLog(this.getClass()); + + private final class Key implements Comparable { + private final int capacity; + + Key(final int capacity) { + this.capacity = capacity; + } + + @Override + public int compareTo(Key that) { + if (this.capacity < that.capacity) return -1; + if (this.capacity > that.capacity) return 1; + return this.hashCode() - that.hashCode(); + } + } + + @VisibleForTesting + final NavigableMap buffers = new TreeMap(); + + private final int maxByteBufferSizeToCache; + private final int maxToCache; + // A running average only it just rises, never recedes + private int runningAverage; + private int totalReservoirCapacity; + + /** + * @param maxByteBufferSizeToCache + * @param initialByteBufferSize + * @param maxToCache + */ + public BoundedByteBufferPool(final int maxByteBufferSizeToCache, final int initialByteBufferSize, + final int maxToCache) { + this.maxByteBufferSizeToCache = maxByteBufferSizeToCache; + this.runningAverage = initialByteBufferSize; + this.maxToCache = maxToCache; + } + + public synchronized ByteBuffer getBuffer() { + Key key = this.buffers.isEmpty()? null: this.buffers.firstKey(); + ByteBuffer bb = null; + if (key == null) { + bb = ByteBuffer.allocate(this.runningAverage); + } else { + bb = this.buffers.remove(key); + if (bb == null) throw new IllegalStateException(); + bb.clear(); + this.totalReservoirCapacity -= bb.capacity(); + } + if (LOG.isTraceEnabled()) { + LOG.trace("runningAverage=" + this.runningAverage + + ", totalCapacity=" + this.totalReservoirCapacity + ", count=" + this.buffers.size()); + } + return bb; + } + + public synchronized void putBuffer(ByteBuffer buffer) { + // If buffer is larger than we want to keep around, just let it go. + if (buffer.capacity() > this.maxByteBufferSizeToCache) return; + // futureSize is how many byte buffers the reservoir will have if this method succeeds. + int futureSize = this.buffers.size() + 1; + if (futureSize > this.maxToCache) { + // If at max size, something is wrong. WARN. + if (LOG.isWarnEnabled()) LOG.warn("At capacity: " + futureSize); + return; + } + this.totalReservoirCapacity += buffer.capacity(); + int average = this.totalReservoirCapacity / futureSize; + if (average > this.runningAverage && average < this.maxByteBufferSizeToCache) { + this.runningAverage = average; + } + this.buffers.put(new Key(buffer.capacity()), buffer); + } +} \ No newline at end of file diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java index 257b850572d..eee5866540a 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java @@ -43,17 +43,32 @@ public class ByteBufferOutputStream extends OutputStream { } public ByteBufferOutputStream(int capacity, boolean useDirectByteBuffer) { - if (useDirectByteBuffer) { - buf = ByteBuffer.allocateDirect(capacity); - } else { - buf = ByteBuffer.allocate(capacity); - } + this(allocate(capacity, useDirectByteBuffer)); + } + + /** + * @param bb ByteBuffer to use. If too small, will be discarded and a new one allocated in its + * place; i.e. the passed in BB may be DESTROYED!!! Minimally it will be altered. If you want + * to obtain the newly allocated ByteBuffer, you'll need to pick it up when + * done with this instance by calling {@link #getByteBuffer()}. All this encapsulation violation + * is so we can recycle buffers rather than allocate each time; it can get expensive especially + * if the buffers are big doing allocations each time or having them undergo resizing because + * initial allocation was small. + * @see #getByteBuffer() + */ + public ByteBufferOutputStream(final ByteBuffer bb) { + this.buf = bb; + this.buf.clear(); } public int size() { return buf.position(); } + private static ByteBuffer allocate(final int capacity, final boolean useDirectByteBuffer) { + return useDirectByteBuffer? ByteBuffer.allocateDirect(capacity): ByteBuffer.allocate(capacity); + } + /** * This flips the underlying BB so be sure to use it _last_! * @return ByteBuffer @@ -70,12 +85,7 @@ public class ByteBufferOutputStream extends OutputStream { int newSize = (int)Math.min((((long)buf.capacity()) * 2), (long)(Integer.MAX_VALUE)); newSize = Math.max(newSize, buf.position() + extra); - ByteBuffer newBuf = null; - if (buf.isDirect()) { - newBuf = ByteBuffer.allocateDirect(newSize); - } else { - newBuf = ByteBuffer.allocate(newSize); - } + ByteBuffer newBuf = allocate(newSize, buf.isDirect()); buf.flip(); newBuf.put(buf); buf = newBuf; diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferResevoir.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferResevoir.java new file mode 100644 index 00000000000..c84781316f0 --- /dev/null +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferResevoir.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io; + +import static org.junit.Assert.*; + +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.testclassification.IOTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ IOTests.class, SmallTests.class }) +public class TestByteBufferResevoir { + final int maxByteBufferSizeToCache = 10; + final int initialByteBufferSize = 1; + final int maxToCache = 10; + BoundedByteBufferPool reservoir; + + @Before + public void before() { + this.reservoir = + new BoundedByteBufferPool(maxByteBufferSizeToCache, initialByteBufferSize, maxToCache); + } + + @After + public void after() { + this.reservoir = null; + } + + @Test + public void testGetPut() { + ByteBuffer bb = this.reservoir.getBuffer(); + assertEquals(initialByteBufferSize, bb.capacity()); + assertEquals(0, this.reservoir.buffers.size()); + this.reservoir.putBuffer(bb); + assertEquals(1, this.reservoir.buffers.size()); + // Now remove a buffer and don't put it back so reservoir is empty. + this.reservoir.getBuffer(); + assertEquals(0, this.reservoir.buffers.size()); + // Try adding in a buffer with a bigger-than-initial size and see if our runningAverage works. + // Need to add then remove, then get a new bytebuffer so reservoir internally is doing + // allocation + final int newCapacity = 2; + this.reservoir.putBuffer(ByteBuffer.allocate(newCapacity)); + assertEquals(1, reservoir.buffers.size()); + this.reservoir.getBuffer(); + assertEquals(0, this.reservoir.buffers.size()); + bb = this.reservoir.getBuffer(); + assertEquals(newCapacity, bb.capacity()); + // Assert that adding a too-big buffer won't happen + assertEquals(0, this.reservoir.buffers.size()); + this.reservoir.putBuffer(ByteBuffer.allocate(maxByteBufferSizeToCache * 2)); + assertEquals(0, this.reservoir.buffers.size()); + // Assert we can't add more than max allowed instances. + for (int i = 0; i < maxToCache; i++) { + this.reservoir.putBuffer(ByteBuffer.allocate(initialByteBufferSize)); + } + assertEquals(maxToCache, this.reservoir.buffers.size()); + } + + @Test + public void testComesOutSmallestFirst() { + // Put in bbs that are sized 1-5 in random order. Put in a few of size 2 and make sure they + // each come out too. + this.reservoir.putBuffer(ByteBuffer.allocate(5)); + assertEquals(1, this.reservoir.buffers.size()); + this.reservoir.putBuffer(ByteBuffer.allocate(2)); + assertEquals(2, this.reservoir.buffers.size()); + this.reservoir.putBuffer(ByteBuffer.allocate(2)); + assertEquals(3, this.reservoir.buffers.size()); + this.reservoir.putBuffer(ByteBuffer.allocate(3)); + assertEquals(4, this.reservoir.buffers.size()); + this.reservoir.putBuffer(ByteBuffer.allocate(1)); + assertEquals(5, this.reservoir.buffers.size()); + this.reservoir.putBuffer(ByteBuffer.allocate(2)); + assertEquals(6, this.reservoir.buffers.size()); + this.reservoir.putBuffer(ByteBuffer.allocate(4)); + assertEquals(7, this.reservoir.buffers.size()); + // Now get them out and they should come out smallest first. + assertEquals(1, this.reservoir.getBuffer().capacity()); + assertEquals(2, this.reservoir.getBuffer().capacity()); + assertEquals(2, this.reservoir.getBuffer().capacity()); + assertEquals(2, this.reservoir.getBuffer().capacity()); + assertEquals(3, this.reservoir.getBuffer().capacity()); + assertEquals(4, this.reservoir.getBuffer().capacity()); + assertEquals(5, this.reservoir.getBuffer().capacity()); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 064771c52fd..13bd7b7ea0c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -82,6 +82,7 @@ import org.apache.hadoop.hbase.client.Operation; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.io.ByteBufferOutputStream; +import org.apache.hadoop.hbase.io.BoundedByteBufferPool; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta; @@ -267,6 +268,9 @@ public class RpcServer implements RpcServerInterface { private UserProvider userProvider; + private final BoundedByteBufferPool reservoir; + + /** * Datastructure that holds all necessary to a method invocation and then afterward, carries * the result. @@ -293,6 +297,7 @@ public class RpcServer implements RpcServerInterface { protected long size; // size of current call protected boolean isError; protected TraceInfo tinfo; + private ByteBuffer recycledByteBuffer = null; Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header, Message param, CellScanner cellScanner, Connection connection, Responder responder, @@ -313,6 +318,19 @@ public class RpcServer implements RpcServerInterface { this.tinfo = tinfo; } + /** + * Call is done. Execution happened and we returned results to client. It is now safe to + * cleanup. + */ + void done() { + if (this.recycledByteBuffer != null) { + // Return buffer to reservoir now we are done with it. + reservoir.putBuffer(this.recycledByteBuffer); + this.recycledByteBuffer = null; + } + this.connection.decRpcCount(); // Say that we're done with this call. + } + @Override public String toString() { return toShortString() + " param: " + @@ -375,8 +393,9 @@ public class RpcServer implements RpcServerInterface { // Set the exception as the result of the method invocation. headerBuilder.setException(exceptionBuilder.build()); } - ByteBuffer cellBlock = - ipcUtil.buildCellBlock(this.connection.codec, this.connection.compressionCodec, cells); + this.recycledByteBuffer = reservoir.getBuffer(); + ByteBuffer cellBlock = ipcUtil.buildCellBlock(this.connection.codec, + this.connection.compressionCodec, cells, recycledByteBuffer); if (cellBlock != null) { CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder(); // Presumes the cellBlock bytebuffer has been flipped so limit has total size in it. @@ -1051,7 +1070,7 @@ public class RpcServer implements RpcServerInterface { } if (!call.response.hasRemaining()) { - call.connection.decRpcCount(); // Say that we're done with this call. + call.done(); return true; } else { return false; // Socket can't take more, we will have to come back. @@ -1885,7 +1904,13 @@ public class RpcServer implements RpcServerInterface { final InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler) throws IOException { - + this.reservoir = new BoundedByteBufferPool( + conf.getInt("hbase.ipc.server.reservoir.max.buffer.size", 1024 * 1024), + conf.getInt("hbase.ipc.server.reservoir.initial.buffer.size", 16 * 1024), + // Make the max twice the number of handlers to be safe. + conf.getInt("hbase.ipc.server.reservoir.initial.max", + conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, + HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * 2)); this.server = server; this.services = services; this.bindAddress = bindAddress; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferOutputStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferOutputStream.java new file mode 100644 index 00000000000..e39b725beb1 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferOutputStream.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; + +public class TestByteBufferOutputStream { + @Test + public void testByteBufferReuse() throws IOException { + Bytes.toBytes("some bytes"); + ByteBuffer bb = ByteBuffer.allocate(16); + ByteBuffer bbToReuse = write(bb, Bytes.toBytes("some bytes")); + bbToReuse = write(bbToReuse, Bytes.toBytes("less")); + assertTrue(bb == bbToReuse); + } + + private ByteBuffer write(final ByteBuffer bb, final byte [] bytes) throws IOException { + try (ByteBufferOutputStream bbos = new ByteBufferOutputStream(bb)) { + bbos.write(bytes); + assertTrue(Bytes.compareTo(bytes, bbos.toByteArray(0, bytes.length)) == 0); + bbos.flush(); + return bbos.getByteBuffer(); + } + } +} \ No newline at end of file