From 1eae9aeeac0d20f84902455f14047580f86327f2 Mon Sep 17 00:00:00 2001 From: Ramkrishna Date: Wed, 26 Oct 2016 14:33:49 +0530 Subject: [PATCH] HBASE-16783 Use ByteBufferPool for the header and message during Rpc response (Ram) --- .../hbase/io/ByteBufferListOutputStream.java | 4 ++ .../apache/hadoop/hbase/ipc/RpcServer.java | 65 +++++++++++++++---- 2 files changed, 55 insertions(+), 14 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java index b4c00c6b0fe..c334a5a7d9c 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java @@ -134,6 +134,10 @@ public class ByteBufferListOutputStream extends ByteBufferOutputStream { throw new UnsupportedOperationException(); } + /** + * We can be assured that the buffers returned by this method are all flipped + * @return list of bytebuffers + */ public List getByteBuffers() { if (!this.lastBufFlipped) { this.lastBufFlipped = true; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 00c7254944c..7bcf3a73039 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -119,6 +119,7 @@ import org.apache.hadoop.hbase.security.SaslUtil; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager; +import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; @@ -480,7 +481,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { headerBuilder.setCellBlockMeta(cellBlockBuilder.build()); } Message header = headerBuilder.build(); - byte[] b = createHeaderAndMessageBytes(result, header, cellBlockSize); + ByteBuffer headerBuf = + createHeaderAndMessageBytes(result, header, cellBlockSize, cellBlock); ByteBuffer[] responseBufs = null; int cellBlockBufferSize = 0; if (cellBlock != null) { @@ -489,7 +491,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } else { responseBufs = new ByteBuffer[1]; } - responseBufs[0] = ByteBuffer.wrap(b); + responseBufs[0] = headerBuf; if (cellBlock != null) { for (int i = 0; i < cellBlockBufferSize; i++) { responseBufs[i + 1] = cellBlock.get(i); @@ -533,10 +535,17 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { headerBuilder.setException(exceptionBuilder.build()); } - private byte[] createHeaderAndMessageBytes(Message result, Message header, int cellBlockSize) - throws IOException { + private ByteBuffer createHeaderAndMessageBytes(Message result, Message header, + int cellBlockSize, List cellBlock) throws IOException { // Organize the response as a set of bytebuffers rather than collect it all together inside // one big byte array; save on allocations. + // for writing the header, we check if there is available space in the buffers + // created for the cellblock itself. If there is space for the header, we reuse + // the last buffer in the cellblock. This applies to the cellblock created from the + // pool or even the onheap cellblock buffer in case there is no pool enabled. + // Possible reuse would avoid creating a temporary array for storing the header every time. + ByteBuffer possiblePBBuf = + (cellBlockSize > 0) ? cellBlock.get(cellBlock.size() - 1) : null; int headerSerializedSize = 0, resultSerializedSize = 0, headerVintSize = 0, resultVintSize = 0; if (header != null) { @@ -551,15 +560,36 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { int totalSize = headerSerializedSize + headerVintSize + (resultSerializedSize + resultVintSize) + cellBlockSize; - // The byte[] should also hold the totalSize of the header, message and the cellblock - byte[] b = new byte[headerSerializedSize + headerVintSize + resultSerializedSize - + resultVintSize + Bytes.SIZEOF_INT]; - // The RpcClient expects the int to be in a format that code be decoded by - // the DataInputStream#readInt(). Hence going with the Bytes.toBytes(int) - // form of writing int. - Bytes.putInt(b, 0, totalSize); - CodedOutputStream cos = CodedOutputStream.newInstance(b, Bytes.SIZEOF_INT, - b.length - Bytes.SIZEOF_INT); + int totalPBSize = headerSerializedSize + headerVintSize + resultSerializedSize + + resultVintSize + Bytes.SIZEOF_INT; + // Only if the last buffer has enough space for header use it. Else allocate + // a new buffer. Assume they are all flipped + if (possiblePBBuf != null + && possiblePBBuf.limit() + totalPBSize <= possiblePBBuf.capacity()) { + // duplicate the buffer. This is where the header is going to be written + ByteBuffer pbBuf = possiblePBBuf.duplicate(); + // get the current limit + int limit = pbBuf.limit(); + // Position such that we write the header to the end of the buffer + pbBuf.position(limit); + // limit to the header size + pbBuf.limit(totalPBSize + limit); + // mark the current position + pbBuf.mark(); + writeToCOS(result, header, totalSize, pbBuf); + // reset the buffer back to old position + pbBuf.reset(); + return pbBuf; + } else { + return createHeaderAndMessageBytes(result, header, totalSize, totalPBSize); + } + } + + private void writeToCOS(Message result, Message header, int totalSize, ByteBuffer pbBuf) + throws IOException { + ByteBufferUtils.putInt(pbBuf, totalSize); + // create COS that works on BB + CodedOutputStream cos = CodedOutputStream.newInstance(pbBuf); if (header != null) { cos.writeMessageNoTag(header); } @@ -568,7 +598,14 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } cos.flush(); cos.checkNoSpaceLeft(); - return b; + } + + private ByteBuffer createHeaderAndMessageBytes(Message result, Message header, + int totalSize, int totalPBSize) throws IOException { + ByteBuffer pbBuf = ByteBuffer.allocate(totalPBSize); + writeToCOS(result, header, totalSize, pbBuf); + pbBuf.flip(); + return pbBuf; } private BufferChain wrapWithSasl(BufferChain bc)