From 74837d9b1f13b42415395e08ff168aeed0c21676 Mon Sep 17 00:00:00 2001 From: stack Date: Wed, 4 Mar 2015 22:58:02 -0800 Subject: [PATCH] Revert " HBASE-13142 [PERF] Reuse the IPCUtil#buildCellBlock buffer" OVERCOMMIT. REVERT. This reverts commit 55f8f56ad28f71a893acb1e5993689499134a018. --- .../org/apache/hadoop/hbase/ipc/IPCUtil.java | 48 ++----- .../hbase/io/BoundedByteBufferPool.java | 118 ------------------ .../hbase/io/ByteBufferOutputStream.java | 32 ++--- .../hbase/io/TestByteBufferResevoir.java | 107 ---------------- .../apache/hadoop/hbase/ipc/RpcServer.java | 33 +---- .../hbase/io/TestByteBufferOutputStream.java | 46 ------- 6 files changed, 28 insertions(+), 356 deletions(-) delete mode 100644 hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java delete mode 100644 hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferResevoir.java delete mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferOutputStream.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java index 63c2143b2ba..b7e7728fece 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java @@ -65,7 +65,6 @@ public class IPCUtil { this.conf = conf; this.cellBlockDecompressionMultiplier = conf.getInt("hbase.ipc.cellblock.decompression.buffersize.multiplier", 3); - // Guess that 16k is a good size for rpc buffer. Could go bigger. See the TODO below in // #buildCellBlock. this.cellBlockBuildingInitialBufferSize = @@ -91,45 +90,24 @@ public class IPCUtil { @SuppressWarnings("resource") public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor, final CellScanner cellScanner) - throws IOException { - return buildCellBlock(codec, compressor, cellScanner, null); - } - - /** - * Puts CellScanner Cells into a cell block using passed in codec and/or - * compressor. - * @param codec - * @param compressor - * @param cellScanner - * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using - * passed in codec and/or compressor; the returned buffer has been - * flipped and is ready for reading. Use limit to find total size. - * @param bb Use this bb. Can be null if no reuse going on. - * @throws IOException - */ - @SuppressWarnings("resource") - public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor, - final CellScanner cellScanner, final ByteBuffer bb) throws IOException { if (cellScanner == null) return null; if (codec == null) throw new CellScannerButNoCodecException(); int bufferSize = this.cellBlockBuildingInitialBufferSize; - ByteBufferOutputStream baos = null; - if (bb != null) { - bufferSize = bb.capacity(); - baos = new ByteBufferOutputStream(bb); - } else { - // Then we need to make our own to return. - if (cellScanner instanceof HeapSize) { - long longSize = ((HeapSize)cellScanner).heapSize(); - // Just make sure we don't have a size bigger than an int. - if (longSize > Integer.MAX_VALUE) { - throw new IOException("Size " + longSize + " > " + Integer.MAX_VALUE); - } - bufferSize = ClassSize.align((int)longSize); + if (cellScanner instanceof HeapSize) { + long longSize = ((HeapSize)cellScanner).heapSize(); + // Just make sure we don't have a size bigger than an int. + if (longSize > Integer.MAX_VALUE) { + throw new IOException("Size " + longSize + " > " + Integer.MAX_VALUE); } - baos = new ByteBufferOutputStream(bufferSize); - } + bufferSize = ClassSize.align((int)longSize); + } // TODO: Else, get estimate on size of buffer rather than have the buffer resize. + // See TestIPCUtil main for experiment where we spin through the Cells getting estimate of + // total size before creating the buffer. It costs somw small percentage. If we are usually + // within the estimated buffer size, then the cost is not worth it. If we are often well + // outside the guesstimated buffer size, the processing can be done in half the time if we + // go w/ the estimated size rather than let the buffer resize. + ByteBufferOutputStream baos = new ByteBufferOutputStream(bufferSize); OutputStream os = baos; Compressor poolCompressor = null; try { diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java deleted file mode 100644 index 1ed7db0c59a..00000000000 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java +++ /dev/null @@ -1,118 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.io; - -import java.nio.ByteBuffer; -import java.util.NavigableMap; -import java.util.TreeMap; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; - -import com.google.common.annotations.VisibleForTesting; - -/** - * Like Hadoops' ByteBufferPool only you do not specify desired size when getting a ByteBuffer. We - * also keep upper bounds on ByteBuffer size and amount of ByteBuffers we keep int the pool hence - * it is 'bounded' as opposed to 'elastic' as in ElasticByteBuffferPool If a ByteBuffer is bigger - * than a threshold, we will just let the ByteBuffer go rather than keep it around. If more - * ByteBuffers than configured maximum instances, then we do not cache either (we will log a - * WARN in this case). - * - *

The intended use case is a reservoir of bytebuffers that an RPC can reuse; buffers tend to - * achieve a particular 'run' size over time give or take a few extremes. - * - *

Thread safe. - */ -@InterfaceAudience.Private -public class BoundedByteBufferPool { - private final Log LOG = LogFactory.getLog(this.getClass()); - - private final class Key implements Comparable { - private final int capacity; - - Key(final int capacity) { - this.capacity = capacity; - } - - @Override - public int compareTo(Key that) { - if (this.capacity < that.capacity) return -1; - if (this.capacity > that.capacity) return 1; - return this.hashCode() - that.hashCode(); - } - } - - @VisibleForTesting - final NavigableMap buffers = new TreeMap(); - - private final int maxByteBufferSizeToCache; - private final int maxToCache; - // A running average only it just rises, never recedes - private int runningAverage; - private int totalReservoirCapacity; - - /** - * @param maxByteBufferSizeToCache - * @param initialByteBufferSize - * @param maxToCache - */ - public BoundedByteBufferPool(final int maxByteBufferSizeToCache, final int initialByteBufferSize, - final int maxToCache) { - this.maxByteBufferSizeToCache = maxByteBufferSizeToCache; - this.runningAverage = initialByteBufferSize; - this.maxToCache = maxToCache; - } - - public synchronized ByteBuffer getBuffer() { - Key key = this.buffers.isEmpty()? null: this.buffers.firstKey(); - ByteBuffer bb = null; - if (key == null) { - bb = ByteBuffer.allocate(this.runningAverage); - } else { - bb = this.buffers.remove(key); - if (bb == null) throw new IllegalStateException(); - bb.clear(); - this.totalReservoirCapacity -= bb.capacity(); - } - if (LOG.isTraceEnabled()) { - LOG.trace("runningAverage=" + this.runningAverage + - ", totalCapacity=" + this.totalReservoirCapacity + ", count=" + this.buffers.size()); - } - return bb; - } - - public synchronized void putBuffer(ByteBuffer buffer) { - // If buffer is larger than we want to keep around, just let it go. - if (buffer.capacity() > this.maxByteBufferSizeToCache) return; - // futureSize is how many byte buffers the reservoir will have if this method succeeds. - int futureSize = this.buffers.size() + 1; - if (futureSize > this.maxToCache) { - // If at max size, something is wrong. WARN. - if (LOG.isWarnEnabled()) LOG.warn("At capacity: " + futureSize); - return; - } - this.totalReservoirCapacity += buffer.capacity(); - int average = this.totalReservoirCapacity / futureSize; - if (average > this.runningAverage && average < this.maxByteBufferSizeToCache) { - this.runningAverage = average; - } - this.buffers.put(new Key(buffer.capacity()), buffer); - } -} \ No newline at end of file diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java index eee5866540a..257b850572d 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java @@ -43,32 +43,17 @@ public class ByteBufferOutputStream extends OutputStream { } public ByteBufferOutputStream(int capacity, boolean useDirectByteBuffer) { - this(allocate(capacity, useDirectByteBuffer)); - } - - /** - * @param bb ByteBuffer to use. If too small, will be discarded and a new one allocated in its - * place; i.e. the passed in BB may be DESTROYED!!! Minimally it will be altered. If you want - * to obtain the newly allocated ByteBuffer, you'll need to pick it up when - * done with this instance by calling {@link #getByteBuffer()}. All this encapsulation violation - * is so we can recycle buffers rather than allocate each time; it can get expensive especially - * if the buffers are big doing allocations each time or having them undergo resizing because - * initial allocation was small. - * @see #getByteBuffer() - */ - public ByteBufferOutputStream(final ByteBuffer bb) { - this.buf = bb; - this.buf.clear(); + if (useDirectByteBuffer) { + buf = ByteBuffer.allocateDirect(capacity); + } else { + buf = ByteBuffer.allocate(capacity); + } } public int size() { return buf.position(); } - private static ByteBuffer allocate(final int capacity, final boolean useDirectByteBuffer) { - return useDirectByteBuffer? ByteBuffer.allocateDirect(capacity): ByteBuffer.allocate(capacity); - } - /** * This flips the underlying BB so be sure to use it _last_! * @return ByteBuffer @@ -85,7 +70,12 @@ public class ByteBufferOutputStream extends OutputStream { int newSize = (int)Math.min((((long)buf.capacity()) * 2), (long)(Integer.MAX_VALUE)); newSize = Math.max(newSize, buf.position() + extra); - ByteBuffer newBuf = allocate(newSize, buf.isDirect()); + ByteBuffer newBuf = null; + if (buf.isDirect()) { + newBuf = ByteBuffer.allocateDirect(newSize); + } else { + newBuf = ByteBuffer.allocate(newSize); + } buf.flip(); newBuf.put(buf); buf = newBuf; diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferResevoir.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferResevoir.java deleted file mode 100644 index c84781316f0..00000000000 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferResevoir.java +++ /dev/null @@ -1,107 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.io; - -import static org.junit.Assert.*; - -import java.nio.ByteBuffer; - -import org.apache.hadoop.hbase.testclassification.IOTests; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -@Category({ IOTests.class, SmallTests.class }) -public class TestByteBufferResevoir { - final int maxByteBufferSizeToCache = 10; - final int initialByteBufferSize = 1; - final int maxToCache = 10; - BoundedByteBufferPool reservoir; - - @Before - public void before() { - this.reservoir = - new BoundedByteBufferPool(maxByteBufferSizeToCache, initialByteBufferSize, maxToCache); - } - - @After - public void after() { - this.reservoir = null; - } - - @Test - public void testGetPut() { - ByteBuffer bb = this.reservoir.getBuffer(); - assertEquals(initialByteBufferSize, bb.capacity()); - assertEquals(0, this.reservoir.buffers.size()); - this.reservoir.putBuffer(bb); - assertEquals(1, this.reservoir.buffers.size()); - // Now remove a buffer and don't put it back so reservoir is empty. - this.reservoir.getBuffer(); - assertEquals(0, this.reservoir.buffers.size()); - // Try adding in a buffer with a bigger-than-initial size and see if our runningAverage works. - // Need to add then remove, then get a new bytebuffer so reservoir internally is doing - // allocation - final int newCapacity = 2; - this.reservoir.putBuffer(ByteBuffer.allocate(newCapacity)); - assertEquals(1, reservoir.buffers.size()); - this.reservoir.getBuffer(); - assertEquals(0, this.reservoir.buffers.size()); - bb = this.reservoir.getBuffer(); - assertEquals(newCapacity, bb.capacity()); - // Assert that adding a too-big buffer won't happen - assertEquals(0, this.reservoir.buffers.size()); - this.reservoir.putBuffer(ByteBuffer.allocate(maxByteBufferSizeToCache * 2)); - assertEquals(0, this.reservoir.buffers.size()); - // Assert we can't add more than max allowed instances. - for (int i = 0; i < maxToCache; i++) { - this.reservoir.putBuffer(ByteBuffer.allocate(initialByteBufferSize)); - } - assertEquals(maxToCache, this.reservoir.buffers.size()); - } - - @Test - public void testComesOutSmallestFirst() { - // Put in bbs that are sized 1-5 in random order. Put in a few of size 2 and make sure they - // each come out too. - this.reservoir.putBuffer(ByteBuffer.allocate(5)); - assertEquals(1, this.reservoir.buffers.size()); - this.reservoir.putBuffer(ByteBuffer.allocate(2)); - assertEquals(2, this.reservoir.buffers.size()); - this.reservoir.putBuffer(ByteBuffer.allocate(2)); - assertEquals(3, this.reservoir.buffers.size()); - this.reservoir.putBuffer(ByteBuffer.allocate(3)); - assertEquals(4, this.reservoir.buffers.size()); - this.reservoir.putBuffer(ByteBuffer.allocate(1)); - assertEquals(5, this.reservoir.buffers.size()); - this.reservoir.putBuffer(ByteBuffer.allocate(2)); - assertEquals(6, this.reservoir.buffers.size()); - this.reservoir.putBuffer(ByteBuffer.allocate(4)); - assertEquals(7, this.reservoir.buffers.size()); - // Now get them out and they should come out smallest first. - assertEquals(1, this.reservoir.getBuffer().capacity()); - assertEquals(2, this.reservoir.getBuffer().capacity()); - assertEquals(2, this.reservoir.getBuffer().capacity()); - assertEquals(2, this.reservoir.getBuffer().capacity()); - assertEquals(3, this.reservoir.getBuffer().capacity()); - assertEquals(4, this.reservoir.getBuffer().capacity()); - assertEquals(5, this.reservoir.getBuffer().capacity()); - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 13bd7b7ea0c..064771c52fd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -82,7 +82,6 @@ import org.apache.hadoop.hbase.client.Operation; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.io.ByteBufferOutputStream; -import org.apache.hadoop.hbase.io.BoundedByteBufferPool; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta; @@ -268,9 +267,6 @@ public class RpcServer implements RpcServerInterface { private UserProvider userProvider; - private final BoundedByteBufferPool reservoir; - - /** * Datastructure that holds all necessary to a method invocation and then afterward, carries * the result. @@ -297,7 +293,6 @@ public class RpcServer implements RpcServerInterface { protected long size; // size of current call protected boolean isError; protected TraceInfo tinfo; - private ByteBuffer recycledByteBuffer = null; Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header, Message param, CellScanner cellScanner, Connection connection, Responder responder, @@ -318,19 +313,6 @@ public class RpcServer implements RpcServerInterface { this.tinfo = tinfo; } - /** - * Call is done. Execution happened and we returned results to client. It is now safe to - * cleanup. - */ - void done() { - if (this.recycledByteBuffer != null) { - // Return buffer to reservoir now we are done with it. - reservoir.putBuffer(this.recycledByteBuffer); - this.recycledByteBuffer = null; - } - this.connection.decRpcCount(); // Say that we're done with this call. - } - @Override public String toString() { return toShortString() + " param: " + @@ -393,9 +375,8 @@ public class RpcServer implements RpcServerInterface { // Set the exception as the result of the method invocation. headerBuilder.setException(exceptionBuilder.build()); } - this.recycledByteBuffer = reservoir.getBuffer(); - ByteBuffer cellBlock = ipcUtil.buildCellBlock(this.connection.codec, - this.connection.compressionCodec, cells, recycledByteBuffer); + ByteBuffer cellBlock = + ipcUtil.buildCellBlock(this.connection.codec, this.connection.compressionCodec, cells); if (cellBlock != null) { CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder(); // Presumes the cellBlock bytebuffer has been flipped so limit has total size in it. @@ -1070,7 +1051,7 @@ public class RpcServer implements RpcServerInterface { } if (!call.response.hasRemaining()) { - call.done(); + call.connection.decRpcCount(); // Say that we're done with this call. return true; } else { return false; // Socket can't take more, we will have to come back. @@ -1904,13 +1885,7 @@ public class RpcServer implements RpcServerInterface { final InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler) throws IOException { - this.reservoir = new BoundedByteBufferPool( - conf.getInt("hbase.ipc.server.reservoir.max.buffer.size", 1024 * 1024), - conf.getInt("hbase.ipc.server.reservoir.initial.buffer.size", 16 * 1024), - // Make the max twice the number of handlers to be safe. - conf.getInt("hbase.ipc.server.reservoir.initial.max", - conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, - HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * 2)); + this.server = server; this.services = services; this.bindAddress = bindAddress; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferOutputStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferOutputStream.java deleted file mode 100644 index e39b725beb1..00000000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferOutputStream.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.io; - -import static org.junit.Assert.*; - -import java.io.IOException; -import java.nio.ByteBuffer; - -import org.apache.hadoop.hbase.util.Bytes; -import org.junit.Test; - -public class TestByteBufferOutputStream { - @Test - public void testByteBufferReuse() throws IOException { - Bytes.toBytes("some bytes"); - ByteBuffer bb = ByteBuffer.allocate(16); - ByteBuffer bbToReuse = write(bb, Bytes.toBytes("some bytes")); - bbToReuse = write(bbToReuse, Bytes.toBytes("less")); - assertTrue(bb == bbToReuse); - } - - private ByteBuffer write(final ByteBuffer bb, final byte [] bytes) throws IOException { - try (ByteBufferOutputStream bbos = new ByteBufferOutputStream(bb)) { - bbos.write(bytes); - assertTrue(Bytes.compareTo(bytes, bbos.toByteArray(0, bytes.length)) == 0); - bbos.flush(); - return bbos.getByteBuffer(); - } - } -} \ No newline at end of file