From 647613b308c737a79159ed6f30c5278d3594bdb1 Mon Sep 17 00:00:00 2001 From: chenxu14 <47170471+chenxu14@users.noreply.github.com> Date: Mon, 2 Sep 2019 13:01:36 +0800 Subject: [PATCH] HBASE-22905 Avoid temp ByteBuffer allocation in (#538) BlockingRpcConnection#writeRequest --- .../hbase/ipc/BlockingRpcConnection.java | 68 +++++++++++-------- .../org/apache/hadoop/hbase/ipc/IPCUtil.java | 11 ++- 2 files changed, 43 insertions(+), 36 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java index b152a1dbe33..ee0aeeaad2f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java @@ -35,7 +35,6 @@ import java.io.OutputStream; import java.net.Socket; import java.net.SocketTimeoutException; import java.net.UnknownHostException; -import java.nio.ByteBuffer; import java.security.PrivilegedExceptionAction; import java.util.ArrayDeque; import java.util.Locale; @@ -70,6 +69,8 @@ import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.protobuf.Message; import org.apache.hbase.thirdparty.com.google.protobuf.Message.Builder; import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; +import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; +import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; @@ -599,37 +600,44 @@ class BlockingRpcConnection extends RpcConnection implements Runnable { * @see #readResponse() */ private void writeRequest(Call call) throws IOException { - ByteBuffer cellBlock = this.rpcClient.cellBlockBuilder.buildCellBlock(this.codec, - this.compressor, call.cells); - CellBlockMeta cellBlockMeta; - if (cellBlock != null) { - cellBlockMeta = CellBlockMeta.newBuilder().setLength(cellBlock.limit()).build(); - } else { - cellBlockMeta = null; - } - RequestHeader requestHeader = buildRequestHeader(call, cellBlockMeta); - - setupIOstreams(); - - // Now we're going to write the call. We take the lock, then check that the connection - // is still valid, and, if so we do the write to the socket. If the write fails, we don't - // know where we stand, we have to close the connection. - if (Thread.interrupted()) { - throw new InterruptedIOException(); - } - - calls.put(call.id, call); // We put first as we don't want the connection to become idle. - // from here, we do not throw any exception to upper layer as the call has been tracked in the - // pending calls map. + ByteBuf cellBlock = null; try { - call.callStats.setRequestSizeBytes(write(this.out, requestHeader, call.param, cellBlock)); - } catch (Throwable t) { - if(LOG.isTraceEnabled()) { - LOG.trace("Error while writing call, call_id:" + call.id, t); + cellBlock = this.rpcClient.cellBlockBuilder.buildCellBlock(this.codec, this.compressor, + call.cells, PooledByteBufAllocator.DEFAULT); + CellBlockMeta cellBlockMeta; + if (cellBlock != null) { + cellBlockMeta = CellBlockMeta.newBuilder().setLength(cellBlock.readableBytes()).build(); + } else { + cellBlockMeta = null; + } + RequestHeader requestHeader = buildRequestHeader(call, cellBlockMeta); + + setupIOstreams(); + + // Now we're going to write the call. We take the lock, then check that the connection + // is still valid, and, if so we do the write to the socket. If the write fails, we don't + // know where we stand, we have to close the connection. + if (Thread.interrupted()) { + throw new InterruptedIOException(); + } + + calls.put(call.id, call); // We put first as we don't want the connection to become idle. + // from here, we do not throw any exception to upper layer as the call has been tracked in + // the pending calls map. + try { + call.callStats.setRequestSizeBytes(write(this.out, requestHeader, call.param, cellBlock)); + } catch (Throwable t) { + if(LOG.isTraceEnabled()) { + LOG.trace("Error while writing call, call_id:" + call.id, t); + } + IOException e = IPCUtil.toIOE(t); + closeConn(e); + return; + } + } finally { + if (cellBlock != null) { + cellBlock.release(); } - IOException e = IPCUtil.toIOE(t); - closeConn(e); - return; } notifyAll(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java index c6bbd0ba5eb..46562c514fb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java @@ -23,7 +23,6 @@ import java.lang.reflect.InvocationTargetException; import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.SocketTimeoutException; -import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.util.concurrent.TimeoutException; import org.apache.hadoop.hbase.DoNotRetryIOException; @@ -41,7 +40,7 @@ import org.apache.yetus.audience.InterfaceAudience; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream; import org.apache.hbase.thirdparty.com.google.protobuf.Message; - +import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; @@ -62,19 +61,19 @@ class IPCUtil { * @throws IOException if write action fails */ public static int write(final OutputStream dos, final Message header, final Message param, - final ByteBuffer cellBlock) throws IOException { + final ByteBuf cellBlock) throws IOException { // Must calculate total size and write that first so other side can read it all in in one // swoop. This is dictated by how the server is currently written. Server needs to change // if we are to be able to write without the length prefixing. int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header, param); if (cellBlock != null) { - totalSize += cellBlock.remaining(); + totalSize += cellBlock.readableBytes(); } return write(dos, header, param, cellBlock, totalSize); } private static int write(final OutputStream dos, final Message header, final Message param, - final ByteBuffer cellBlock, final int totalSize) throws IOException { + final ByteBuf cellBlock, final int totalSize) throws IOException { // I confirmed toBytes does same as DataOutputStream#writeInt. dos.write(Bytes.toBytes(totalSize)); // This allocates a buffer that is the size of the message internally. @@ -83,7 +82,7 @@ class IPCUtil { param.writeDelimitedTo(dos); } if (cellBlock != null) { - dos.write(cellBlock.array(), 0, cellBlock.remaining()); + cellBlock.readBytes(dos, cellBlock.readableBytes()); } dos.flush(); return totalSize;