HBASE-5190 Limit the IPC queue size based on calls' payload size

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1304634 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jean-Daniel Cryans 2012-03-23 22:32:31 +00:00
parent 86962028dd
commit 560173f756
1 changed files with 47 additions and 12 deletions

View File

@ -76,6 +76,8 @@ import org.apache.hadoop.util.StringUtils;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.cliffc.high_scale_lib.Counter;
/** An abstract IPC service. IPC calls take a single {@link Writable} as a /** An abstract IPC service. IPC calls take a single {@link Writable} as a
* parameter, and return a {@link Writable} as their value. A service runs on * parameter, and return a {@link Writable} as their value. A service runs on
* a port and is defined by a parameter class and a value class. * a port and is defined by a parameter class and a value class.
@ -97,7 +99,13 @@ public abstract class HBaseServer implements RpcServer {
/** /**
* How many calls/handler are allowed in the queue. * How many calls/handler are allowed in the queue.
*/ */
private static final int DEFAULT_MAX_QUEUE_SIZE_PER_HANDLER = 10; private static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10;
/**
* The maximum size that we can hold in the IPC queue
*/
private static final int DEFAULT_MAX_CALLQUEUE_SIZE =
1024 * 1024 * 1024;
static final int BUFFER_INITIAL_SIZE = 1024; static final int BUFFER_INITIAL_SIZE = 1024;
@ -193,6 +201,7 @@ public abstract class HBaseServer implements RpcServer {
protected Configuration conf; protected Configuration conf;
private int maxQueueLength;
private int maxQueueSize; private int maxQueueSize;
protected int socketSendBufferSize; protected int socketSendBufferSize;
protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
@ -201,6 +210,7 @@ public abstract class HBaseServer implements RpcServer {
volatile protected boolean running = true; // true while server runs volatile protected boolean running = true; // true while server runs
protected BlockingQueue<Call> callQueue; // queued calls protected BlockingQueue<Call> callQueue; // queued calls
protected final Counter callQueueSize = new Counter();
protected BlockingQueue<Call> priorityCallQueue; protected BlockingQueue<Call> priorityCallQueue;
protected int highPriorityLevel; // what level a high priority call is at protected int highPriorityLevel; // what level a high priority call is at
@ -261,10 +271,11 @@ public abstract class HBaseServer implements RpcServer {
protected Responder responder; protected Responder responder;
protected boolean delayReturnValue; // if the return value should be protected boolean delayReturnValue; // if the return value should be
// set at call completion // set at call completion
protected long size; // size of current call
protected boolean isError; protected boolean isError;
public Call(int id, Writable param, Connection connection, public Call(int id, Writable param, Connection connection,
Responder responder) { Responder responder, long size) {
this.id = id; this.id = id;
this.param = param; this.param = param;
this.connection = connection; this.connection = connection;
@ -273,6 +284,7 @@ public abstract class HBaseServer implements RpcServer {
this.delayResponse = false; this.delayResponse = false;
this.responder = responder; this.responder = responder;
this.isError = false; this.isError = false;
this.size = size;
} }
@Override @Override
@ -405,6 +417,10 @@ public abstract class HBaseServer implements RpcServer {
return this.delayReturnValue; return this.delayReturnValue;
} }
public long getSize() {
return this.size;
}
/** /**
* If we have a response, and delay is not set, then respond * If we have a response, and delay is not set, then respond
* immediately. Otherwise, do not respond to client. This is * immediately. Otherwise, do not respond to client. This is
@ -1203,7 +1219,7 @@ public abstract class HBaseServer implements RpcServer {
// we return 0 which will keep the socket up -- bad clients, unless // we return 0 which will keep the socket up -- bad clients, unless
// they switch to suit the running server -- will fail later doing // they switch to suit the running server -- will fail later doing
// getProtocolVersion. // getProtocolVersion.
Call fakeCall = new Call(0, null, this, responder); Call fakeCall = new Call(0, null, this, responder, 0);
// Versions 3 and greater can interpret this exception // Versions 3 and greater can interpret this exception
// response in the same manner // response in the same manner
setupResponse(buffer, fakeCall, Status.FATAL, setupResponse(buffer, fakeCall, Status.FATAL,
@ -1235,9 +1251,23 @@ public abstract class HBaseServer implements RpcServer {
DataInputStream dis = DataInputStream dis =
new DataInputStream(new ByteArrayInputStream(buf)); new DataInputStream(new ByteArrayInputStream(buf));
int id = dis.readInt(); // try to read an id int id = dis.readInt(); // try to read an id
long callSize = buf.length;
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled()) {
LOG.debug(" got call #" + id + ", " + buf.length + " bytes"); LOG.debug(" got call #" + id + ", " + callSize + " bytes");
}
// Enforcing the call queue size, this triggers a retry in the client
if ((callSize + callQueueSize.get()) > maxQueueSize) {
final Call callTooBig =
new Call(id, null, this, responder, callSize);
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
setupResponse(responseBuffer, callTooBig, Status.FATAL, null,
IOException.class.getName(),
"Call queue is full, is ipc.server.max.callqueue.size too small?");
responder.doRespond(callTooBig);
return;
}
Writable param; Writable param;
try { try {
@ -1246,7 +1276,8 @@ public abstract class HBaseServer implements RpcServer {
} catch (Throwable t) { } catch (Throwable t) {
LOG.warn("Unable to read call parameters for client " + LOG.warn("Unable to read call parameters for client " +
getHostAddress(), t); getHostAddress(), t);
final Call readParamsFailedCall = new Call(id, null, this, responder); final Call readParamsFailedCall =
new Call(id, null, this, responder, callSize);
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
setupResponse(responseBuffer, readParamsFailedCall, Status.FATAL, null, setupResponse(responseBuffer, readParamsFailedCall, Status.FATAL, null,
@ -1255,7 +1286,8 @@ public abstract class HBaseServer implements RpcServer {
responder.doRespond(readParamsFailedCall); responder.doRespond(readParamsFailedCall);
return; return;
} }
Call call = new Call(id, param, this, responder); Call call = new Call(id, param, this, responder, callSize);
callQueueSize.add(callSize);
if (priorityCallQueue != null && getQosLevel(param) > highPriorityLevel) { if (priorityCallQueue != null && getQosLevel(param) > highPriorityLevel) {
priorityCallQueue.put(call); priorityCallQueue.put(call);
@ -1360,7 +1392,7 @@ public abstract class HBaseServer implements RpcServer {
RequestContext.clear(); RequestContext.clear();
} }
CurCall.set(null); CurCall.set(null);
callQueueSize.add(call.getSize() * -1);
// Set the response for undelayed calls and delayed calls with // Set the response for undelayed calls and delayed calls with
// undelayed responses. // undelayed responses.
if (!call.isDelayed() || !call.isReturnValueDelayed()) { if (!call.isDelayed() || !call.isReturnValueDelayed()) {
@ -1443,15 +1475,18 @@ public abstract class HBaseServer implements RpcServer {
this.handlerCount = handlerCount; this.handlerCount = handlerCount;
this.priorityHandlerCount = priorityHandlerCount; this.priorityHandlerCount = priorityHandlerCount;
this.socketSendBufferSize = 0; this.socketSendBufferSize = 0;
this.maxQueueLength =
this.conf.getInt("ipc.server.max.callqueue.length",
handlerCount * DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
this.maxQueueSize = this.maxQueueSize =
this.conf.getInt("ipc.server.max.queue.size", this.conf.getInt("ipc.server.max.callqueue.size",
handlerCount * DEFAULT_MAX_QUEUE_SIZE_PER_HANDLER); DEFAULT_MAX_CALLQUEUE_SIZE);
this.readThreads = conf.getInt( this.readThreads = conf.getInt(
"ipc.server.read.threadpool.size", "ipc.server.read.threadpool.size",
10); 10);
this.callQueue = new LinkedBlockingQueue<Call>(maxQueueSize); this.callQueue = new LinkedBlockingQueue<Call>(maxQueueLength);
if (priorityHandlerCount > 0) { if (priorityHandlerCount > 0) {
this.priorityCallQueue = new LinkedBlockingQueue<Call>(maxQueueSize); // TODO hack on size this.priorityCallQueue = new LinkedBlockingQueue<Call>(maxQueueLength); // TODO hack on size
} else { } else {
this.priorityCallQueue = null; this.priorityCallQueue = null;
} }