HBASE-16783 Use ByteBufferPool for the header and message during Rpc

response (Ram)
This commit is contained in:
Ramkrishna 2016-10-26 14:33:49 +05:30
parent c7c45f2c85
commit 1eae9aeeac
2 changed files with 55 additions and 14 deletions

View File

@ -134,6 +134,10 @@ public class ByteBufferListOutputStream extends ByteBufferOutputStream {
throw new UnsupportedOperationException();
}
/**
* We can be assured that the buffers returned by this method are all flipped
* @return list of bytebuffers
*/
public List<ByteBuffer> getByteBuffers() {
if (!this.lastBufFlipped) {
this.lastBufFlipped = true;

View File

@ -119,6 +119,7 @@ import org.apache.hadoop.hbase.security.SaslUtil;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
@ -480,7 +481,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
headerBuilder.setCellBlockMeta(cellBlockBuilder.build());
}
Message header = headerBuilder.build();
byte[] b = createHeaderAndMessageBytes(result, header, cellBlockSize);
ByteBuffer headerBuf =
createHeaderAndMessageBytes(result, header, cellBlockSize, cellBlock);
ByteBuffer[] responseBufs = null;
int cellBlockBufferSize = 0;
if (cellBlock != null) {
@ -489,7 +491,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
} else {
responseBufs = new ByteBuffer[1];
}
responseBufs[0] = ByteBuffer.wrap(b);
responseBufs[0] = headerBuf;
if (cellBlock != null) {
for (int i = 0; i < cellBlockBufferSize; i++) {
responseBufs[i + 1] = cellBlock.get(i);
@ -533,10 +535,17 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
headerBuilder.setException(exceptionBuilder.build());
}
private byte[] createHeaderAndMessageBytes(Message result, Message header, int cellBlockSize)
throws IOException {
private ByteBuffer createHeaderAndMessageBytes(Message result, Message header,
int cellBlockSize, List<ByteBuffer> cellBlock) throws IOException {
// Organize the response as a set of bytebuffers rather than collect it all together inside
// one big byte array; save on allocations.
// for writing the header, we check if there is available space in the buffers
// created for the cellblock itself. If there is space for the header, we reuse
// the last buffer in the cellblock. This applies to the cellblock created from the
// pool or even the onheap cellblock buffer in case there is no pool enabled.
// Possible reuse would avoid creating a temporary array for storing the header every time.
ByteBuffer possiblePBBuf =
(cellBlockSize > 0) ? cellBlock.get(cellBlock.size() - 1) : null;
int headerSerializedSize = 0, resultSerializedSize = 0, headerVintSize = 0,
resultVintSize = 0;
if (header != null) {
@ -551,15 +560,36 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
int totalSize = headerSerializedSize + headerVintSize
+ (resultSerializedSize + resultVintSize)
+ cellBlockSize;
// The byte[] should also hold the totalSize of the header, message and the cellblock
byte[] b = new byte[headerSerializedSize + headerVintSize + resultSerializedSize
+ resultVintSize + Bytes.SIZEOF_INT];
// The RpcClient expects the int to be in a format that code be decoded by
// the DataInputStream#readInt(). Hence going with the Bytes.toBytes(int)
// form of writing int.
Bytes.putInt(b, 0, totalSize);
CodedOutputStream cos = CodedOutputStream.newInstance(b, Bytes.SIZEOF_INT,
b.length - Bytes.SIZEOF_INT);
int totalPBSize = headerSerializedSize + headerVintSize + resultSerializedSize
+ resultVintSize + Bytes.SIZEOF_INT;
// Only if the last buffer has enough space for header use it. Else allocate
// a new buffer. Assume they are all flipped
if (possiblePBBuf != null
&& possiblePBBuf.limit() + totalPBSize <= possiblePBBuf.capacity()) {
// duplicate the buffer. This is where the header is going to be written
ByteBuffer pbBuf = possiblePBBuf.duplicate();
// get the current limit
int limit = pbBuf.limit();
// Position such that we write the header to the end of the buffer
pbBuf.position(limit);
// limit to the header size
pbBuf.limit(totalPBSize + limit);
// mark the current position
pbBuf.mark();
writeToCOS(result, header, totalSize, pbBuf);
// reset the buffer back to old position
pbBuf.reset();
return pbBuf;
} else {
return createHeaderAndMessageBytes(result, header, totalSize, totalPBSize);
}
}
private void writeToCOS(Message result, Message header, int totalSize, ByteBuffer pbBuf)
throws IOException {
ByteBufferUtils.putInt(pbBuf, totalSize);
// create COS that works on BB
CodedOutputStream cos = CodedOutputStream.newInstance(pbBuf);
if (header != null) {
cos.writeMessageNoTag(header);
}
@ -568,7 +598,14 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
}
cos.flush();
cos.checkNoSpaceLeft();
return b;
}
private ByteBuffer createHeaderAndMessageBytes(Message result, Message header,
int totalSize, int totalPBSize) throws IOException {
ByteBuffer pbBuf = ByteBuffer.allocate(totalPBSize);
writeToCOS(result, header, totalSize, pbBuf);
pbBuf.flip();
return pbBuf;
}
private BufferChain wrapWithSasl(BufferChain bc)