HBASE-1053 bring recent rpc changes down from hadoop

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@727277 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2008-12-17 04:47:14 +00:00
parent 053635c1fa
commit fdf2ecb013
2 changed files with 83 additions and 4 deletions

View File

@ -175,6 +175,7 @@ Release 0.19.0 - Unreleased
outstanding seqnum is 162297053 fr om region -ROOT-,,0
HBASE-1055 Better vm stats on startup
HBASE-1065 Minor logging improvements in the master
HBASE-1053 bring recent rpc changes down from hadoop
NEW FEATURES
HBASE-875 Use MurmurHash instead of JenkinsHash [in bloomfilters]

View File

@ -37,6 +37,8 @@ import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
@ -587,7 +589,7 @@ public abstract class HBaseServer {
//
// Send as much data as we can in the non-blocking fashion
//
int numBytes = channel.write(call.response);
int numBytes = channelWrite(channel, call.response);
if (numBytes < 0) {
return true;
}
@ -762,7 +764,7 @@ public abstract class HBaseServer {
*/
int count = -1;
if (dataLengthBuffer.remaining() > 0) {
count = channel.read(dataLengthBuffer);
count = channelRead(channel, dataLengthBuffer);
if (count < 0 || dataLengthBuffer.remaining() > 0)
return count;
}
@ -770,7 +772,7 @@ public abstract class HBaseServer {
if (!versionRead) {
//Every connection is expected to send the header.
ByteBuffer versionBuffer = ByteBuffer.allocate(1);
count = channel.read(versionBuffer);
count = channelRead(channel, versionBuffer);
if (count <= 0) {
return count;
}
@ -802,7 +804,7 @@ public abstract class HBaseServer {
incRpcCount(); // Increment the rpc count
}
count = channel.read(data);
count = channelRead(channel, data);
if (data.remaining() == 0) {
dataLengthBuffer.clear();
@ -1058,4 +1060,80 @@ public abstract class HBaseServer {
return callQueue.size();
}
/**
* When the read or write buffer size is larger than this limit, i/o will be
* done in chunks of this size. Most RPC requests and responses would be
* be smaller.
*/
private static int NIO_BUFFER_LIMIT = 8*1024; //should not be more than 64KB.
/**
* This is a wrapper around {@link WritableByteChannel#write(ByteBuffer)}.
* If the amount of data is large, it writes to channel in smaller chunks.
* This is to avoid jdk from creating many direct buffers as the size of
* buffer increases. This also minimizes extra copies in NIO layer
* as a result of multiple write operations required to write a large
* buffer.
*
* @see WritableByteChannel#write(ByteBuffer)
*/
private static int channelWrite(WritableByteChannel channel,
ByteBuffer buffer) throws IOException {
return (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
channel.write(buffer) : channelIO(null, channel, buffer);
}
/**
* This is a wrapper around {@link ReadableByteChannel#read(ByteBuffer)}.
* If the amount of data is large, it writes to channel in smaller chunks.
* This is to avoid jdk from creating many direct buffers as the size of
* ByteBuffer increases. There should not be any performance degredation.
*
* @see ReadableByteChannel#read(ByteBuffer)
*/
private static int channelRead(ReadableByteChannel channel,
ByteBuffer buffer) throws IOException {
return (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
channel.read(buffer) : channelIO(channel, null, buffer);
}
/**
* Helper for {@link #channelRead(ReadableByteChannel, ByteBuffer)}
* and {@link #channelWrite(WritableByteChannel, ByteBuffer)}. Only
* one of readCh or writeCh should be non-null.
*
* @see #channelRead(ReadableByteChannel, ByteBuffer)
* @see #channelWrite(WritableByteChannel, ByteBuffer)
*/
private static int channelIO(ReadableByteChannel readCh,
WritableByteChannel writeCh,
ByteBuffer buf) throws IOException {
int originalLimit = buf.limit();
int initialRemaining = buf.remaining();
int ret = 0;
while (buf.remaining() > 0) {
try {
int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
buf.limit(buf.position() + ioSize);
ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf);
if (ret < ioSize) {
break;
}
} finally {
buf.limit(originalLimit);
}
}
int nBytes = initialRemaining - buf.remaining();
return (nBytes > 0) ? nBytes : ret;
}
}