HBASE-3581 hbase rpc should send size of response

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1185617 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2011-10-18 12:20:01 +00:00
parent 6217443231
commit 54acda49f9
3 changed files with 36 additions and 12 deletions

View File

@ -624,6 +624,7 @@ Release 0.92.0 - Unreleased
HBASE-4558 Addendum for TestMasterFailover (Ram) - Breaks the build HBASE-4558 Addendum for TestMasterFailover (Ram) - Breaks the build
HBASE-4568 Make zk dump jsp response faster HBASE-4568 Make zk dump jsp response faster
HBASE-4606 Remove spam in HCM and fix a list.size == 0 HBASE-4606 Remove spam in HCM and fix a list.size == 0
HBASE-3581 hbase rpc should send size of response
TASKS TASKS

View File

@ -33,14 +33,10 @@ import java.net.Socket;
import java.net.SocketException; import java.net.SocketException;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.Hashtable;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import javax.net.SocketFactory; import javax.net.SocketFactory;
@ -280,7 +276,7 @@ public class HBaseClient {
* otherwise, throw the timeout exception. * otherwise, throw the timeout exception.
*/ */
private void handleTimeout(SocketTimeoutException e) throws IOException { private void handleTimeout(SocketTimeoutException e) throws IOException {
if (shouldCloseConnection.get() || !running.get() || if (shouldCloseConnection.get() || !running.get() ||
remoteId.rpcTimeout > 0) { remoteId.rpcTimeout > 0) {
throw e; throw e;
} }
@ -552,14 +548,24 @@ public class HBaseClient {
touch(); touch();
try { try {
int id = in.readInt(); // try to read an id // See HBaseServer.Call.setResponse for where we write out the response.
// It writes the call.id (int), a flag byte, then optionally the length
// of the response (int) followed by data.
// Read the call id.
int id = in.readInt();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug(getName() + " got value #" + id); LOG.debug(getName() + " got value #" + id);
Call call = calls.remove(id); Call call = calls.remove(id);
boolean isError = in.readBoolean(); // read if error // Read the flag byte
byte flag = in.readByte();
boolean isError = ResponseFlag.isError(flag);
if (ResponseFlag.isLength(flag)) {
// Currently length if present is unused.
in.readInt();
}
if (isError) { if (isError) {
//noinspection ThrowableInstanceNeverThrown //noinspection ThrowableInstanceNeverThrown
call.setException(new RemoteException( WritableUtils.readString(in), call.setException(new RemoteException( WritableUtils.readString(in),

View File

@ -298,7 +298,8 @@ public abstract class HBaseServer implements RpcServer {
if (result instanceof WritableWithSize) { if (result instanceof WritableWithSize) {
// get the size hint. // get the size hint.
WritableWithSize ohint = (WritableWithSize) result; WritableWithSize ohint = (WritableWithSize) result;
long hint = ohint.getWritableSize() + Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT; long hint = ohint.getWritableSize() + Bytes.SIZEOF_BYTE +
(2 * Bytes.SIZEOF_INT);
if (hint > Integer.MAX_VALUE) { if (hint > Integer.MAX_VALUE) {
// oops, new problem. // oops, new problem.
IOException ioe = IOException ioe =
@ -313,8 +314,15 @@ public abstract class HBaseServer implements RpcServer {
ByteBufferOutputStream buf = new ByteBufferOutputStream(size); ByteBufferOutputStream buf = new ByteBufferOutputStream(size);
DataOutputStream out = new DataOutputStream(buf); DataOutputStream out = new DataOutputStream(buf);
try { try {
out.writeInt(this.id); // write call id // Call id.
out.writeBoolean(error != null); // write error flag out.writeInt(this.id);
// Write flag.
byte flag = (error != null)?
ResponseFlag.getErrorAndLengthSet(): ResponseFlag.getLengthSetOnly();
out.writeByte(flag);
// Place holder for length set later below after we
// fill the buffer with data.
out.writeInt(0xdeadbeef);
} catch (IOException e) { } catch (IOException e) {
errorClass = e.getClass().getName(); errorClass = e.getClass().getName();
error = StringUtils.stringifyException(e); error = StringUtils.stringifyException(e);
@ -331,7 +339,16 @@ public abstract class HBaseServer implements RpcServer {
LOG.warn("Error sending response to call: ", e); LOG.warn("Error sending response to call: ", e);
} }
this.response = buf.getByteBuffer(); // Set the length into the ByteBuffer after call id and after
// byte flag.
ByteBuffer bb = buf.getByteBuffer();
int bufSiz = bb.remaining();
// Move to the size location in our ByteBuffer past call.id
// and past the byte flag.
bb.position(Bytes.SIZEOF_INT + Bytes.SIZEOF_BYTE);
bb.putInt(bufSiz);
bb.position(0);
this.response = bb;
} }
@Override @Override