HBASE-5560 Avoid RegionServer GC caused by timed-out calls

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1303512 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2012-03-21 19:17:59 +00:00
parent ce36877d30
commit 250252f400
1 changed files with 7 additions and 5 deletions

View File

@ -59,6 +59,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.HbaseObjectWritable; import org.apache.hadoop.hbase.io.HbaseObjectWritable;
import org.apache.hadoop.hbase.io.WritableWithSize; import org.apache.hadoop.hbase.io.WritableWithSize;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
@ -196,6 +197,7 @@ public abstract class HBaseServer implements RpcServer {
protected int socketSendBufferSize; protected int socketSendBufferSize;
protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
protected final boolean tcpKeepAlive; // if T then use keepalives protected final boolean tcpKeepAlive; // if T then use keepalives
protected final long purgeTimeout; // in milliseconds
volatile protected boolean running = true; // true while server runs volatile protected boolean running = true; // true while server runs
protected BlockingQueue<Call> callQueue; // queued calls protected BlockingQueue<Call> callQueue; // queued calls
@ -753,8 +755,6 @@ public abstract class HBaseServer implements RpcServer {
private final Selector writeSelector; private final Selector writeSelector;
private int pending; // connections waiting to register private int pending; // connections waiting to register
final static int PURGE_INTERVAL = 900000; // 15mins
Responder() throws IOException { Responder() throws IOException {
this.setName("IPC Server Responder"); this.setName("IPC Server Responder");
this.setDaemon(true); this.setDaemon(true);
@ -784,7 +784,7 @@ public abstract class HBaseServer implements RpcServer {
while (running) { while (running) {
try { try {
waitPending(); // If a channel is being registered, wait. waitPending(); // If a channel is being registered, wait.
writeSelector.select(PURGE_INTERVAL); writeSelector.select(purgeTimeout);
Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator(); Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
while (iter.hasNext()) { while (iter.hasNext()) {
SelectionKey key = iter.next(); SelectionKey key = iter.next();
@ -798,7 +798,7 @@ public abstract class HBaseServer implements RpcServer {
} }
} }
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
if (now < lastPurgeTime + PURGE_INTERVAL) { if (now < lastPurgeTime + purgeTimeout) {
continue; continue;
} }
lastPurgeTime = now; lastPurgeTime = now;
@ -886,7 +886,7 @@ public abstract class HBaseServer implements RpcServer {
Iterator<Call> iter = call.connection.responseQueue.listIterator(0); Iterator<Call> iter = call.connection.responseQueue.listIterator(0);
while (iter.hasNext()) { while (iter.hasNext()) {
Call nextCall = iter.next(); Call nextCall = iter.next();
if (now > nextCall.timestamp + PURGE_INTERVAL) { if (now > nextCall.timestamp + purgeTimeout) {
closeConnection(nextCall.connection); closeConnection(nextCall.connection);
break; break;
} }
@ -1459,6 +1459,8 @@ public abstract class HBaseServer implements RpcServer {
this.maxIdleTime = 2*conf.getInt("ipc.client.connection.maxidletime", 1000); this.maxIdleTime = 2*conf.getInt("ipc.client.connection.maxidletime", 1000);
this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10); this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000); this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000);
this.purgeTimeout = conf.getLong("ipc.client.call.purge.timeout",
2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
// Start the listener here and let it bind to the port // Start the listener here and let it bind to the port
listener = new Listener(); listener = new Listener();