From fdf2ecb01382777fade9a59a13bc035975c88a3c Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Wed, 17 Dec 2008 04:47:14 +0000 Subject: [PATCH] HBASE-1053 bring recent rpc changes down from hadoop git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@727277 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 1 + .../apache/hadoop/hbase/ipc/HBaseServer.java | 86 ++++++++++++++++++- 2 files changed, 83 insertions(+), 4 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 0882f38d5cc..315fe5c4c7a 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -175,6 +175,7 @@ Release 0.19.0 - Unreleased outstanding seqnum is 162297053 fr om region -ROOT-,,0 HBASE-1055 Better vm stats on startup HBASE-1065 Minor logging improvements in the master + HBASE-1053 bring recent rpc changes down from hadoop NEW FEATURES HBASE-875 Use MurmurHash instead of JenkinsHash [in bloomfilters] diff --git a/src/java/org/apache/hadoop/hbase/ipc/HBaseServer.java b/src/java/org/apache/hadoop/hbase/ipc/HBaseServer.java index 608fdef09a4..368069caf67 100644 --- a/src/java/org/apache/hadoop/hbase/ipc/HBaseServer.java +++ b/src/java/org/apache/hadoop/hbase/ipc/HBaseServer.java @@ -37,6 +37,8 @@ import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; @@ -587,7 +589,7 @@ public abstract class HBaseServer { // // Send as much data as we can in the non-blocking fashion // - int numBytes = channel.write(call.response); + int numBytes = channelWrite(channel, call.response); if (numBytes < 0) { return true; } @@ -762,7 +764,7 @@ public abstract class HBaseServer { */ int count = -1; if (dataLengthBuffer.remaining() > 0) { - count = channel.read(dataLengthBuffer); + count = channelRead(channel, dataLengthBuffer); if (count < 0 || dataLengthBuffer.remaining() > 0) return count; } @@ -770,7 +772,7 @@ public abstract class HBaseServer { if (!versionRead) { //Every connection is expected to send the header. ByteBuffer versionBuffer = ByteBuffer.allocate(1); - count = channel.read(versionBuffer); + count = channelRead(channel, versionBuffer); if (count <= 0) { return count; } @@ -802,7 +804,7 @@ public abstract class HBaseServer { incRpcCount(); // Increment the rpc count } - count = channel.read(data); + count = channelRead(channel, data); if (data.remaining() == 0) { dataLengthBuffer.clear(); @@ -1058,4 +1060,80 @@ public abstract class HBaseServer { return callQueue.size(); } + + /** + * When the read or write buffer size is larger than this limit, i/o will be + * done in chunks of this size. Most RPC requests and responses would be + * be smaller. + */ + private static int NIO_BUFFER_LIMIT = 8*1024; //should not be more than 64KB. + + /** + * This is a wrapper around {@link WritableByteChannel#write(ByteBuffer)}. + * If the amount of data is large, it writes to channel in smaller chunks. + * This is to avoid jdk from creating many direct buffers as the size of + * buffer increases. This also minimizes extra copies in NIO layer + * as a result of multiple write operations required to write a large + * buffer. + * + * @see WritableByteChannel#write(ByteBuffer) + */ + private static int channelWrite(WritableByteChannel channel, + ByteBuffer buffer) throws IOException { + + return (buffer.remaining() <= NIO_BUFFER_LIMIT) ? + channel.write(buffer) : channelIO(null, channel, buffer); + } + + + /** + * This is a wrapper around {@link ReadableByteChannel#read(ByteBuffer)}. + * If the amount of data is large, it writes to channel in smaller chunks. + * This is to avoid jdk from creating many direct buffers as the size of + * ByteBuffer increases. There should not be any performance degredation. + * + * @see ReadableByteChannel#read(ByteBuffer) + */ + private static int channelRead(ReadableByteChannel channel, + ByteBuffer buffer) throws IOException { + + return (buffer.remaining() <= NIO_BUFFER_LIMIT) ? + channel.read(buffer) : channelIO(channel, null, buffer); + } + + /** + * Helper for {@link #channelRead(ReadableByteChannel, ByteBuffer)} + * and {@link #channelWrite(WritableByteChannel, ByteBuffer)}. Only + * one of readCh or writeCh should be non-null. + * + * @see #channelRead(ReadableByteChannel, ByteBuffer) + * @see #channelWrite(WritableByteChannel, ByteBuffer) + */ + private static int channelIO(ReadableByteChannel readCh, + WritableByteChannel writeCh, + ByteBuffer buf) throws IOException { + + int originalLimit = buf.limit(); + int initialRemaining = buf.remaining(); + int ret = 0; + + while (buf.remaining() > 0) { + try { + int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT); + buf.limit(buf.position() + ioSize); + + ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf); + + if (ret < ioSize) { + break; + } + + } finally { + buf.limit(originalLimit); + } + } + + int nBytes = initialRemaining - buf.remaining(); + return (nBytes > 0) ? nBytes : ret; + } }