HBASE-15948 Port "HADOOP-9956 RPC listener inefficiently assigns connections to readers" Adds HADOOP-9955 RPC idle connection closing is extremely inefficient

Changes how we do accounting of Connections to match how it is done in Hadoop.
Adds a ConnectionManager class. Adds new configurations for this new class.

"hbase.ipc.client.idlethreshold" 4000
"hbase.ipc.client.connection.idle-scan-interval.ms" 10000
"hbase.ipc.client.connection.maxidletime" 10000
"hbase.ipc.client.kill.max", 10
"hbase.ipc.server.handler.queue.size", 100

The new scheme does away with synchronization that purportedly would freeze out
reads while we were cleaning up stale connections (according to HADOOP-9955)

Also adds in new mechanism for accepting Connections by pulling in as many
as we can at a time adding them to a Queue instead of doing one at a time.
Can help when bursty traffic according to HADOOP-9956. Removes a blocking
while Reader is busy parsing a request. Adds configuration
"hbase.ipc.server.read.connection-queue.size" with default of 100 for
queue size.

Signed-off-by: stack <stack@apache.org>
This commit is contained in:
stack 2016-06-02 16:59:52 -07:00
parent e66ecd7db6
commit 3a95552cfe
5 changed files with 255 additions and 186 deletions

View File

@ -52,14 +52,16 @@ public interface MetricsHBaseServerSource extends BaseSource {
String TOTAL_CALL_TIME_NAME = "totalCallTime"; String TOTAL_CALL_TIME_NAME = "totalCallTime";
String TOTAL_CALL_TIME_DESC = "Total call time, including both queued and processing time."; String TOTAL_CALL_TIME_DESC = "Total call time, including both queued and processing time.";
String QUEUE_SIZE_NAME = "queueSize"; String QUEUE_SIZE_NAME = "queueSize";
String QUEUE_SIZE_DESC = "Number of bytes in the call queues."; String QUEUE_SIZE_DESC = "Number of bytes in the call queues; request has been read and " +
"parsed and is waiting to run or is currently being executed.";
String GENERAL_QUEUE_NAME = "numCallsInGeneralQueue"; String GENERAL_QUEUE_NAME = "numCallsInGeneralQueue";
String GENERAL_QUEUE_DESC = "Number of calls in the general call queue."; String GENERAL_QUEUE_DESC = "Number of calls in the general call queue; " +
"parsed requests waiting in scheduler to be executed";
String PRIORITY_QUEUE_NAME = "numCallsInPriorityQueue"; String PRIORITY_QUEUE_NAME = "numCallsInPriorityQueue";
String REPLICATION_QUEUE_NAME = "numCallsInReplicationQueue"; String REPLICATION_QUEUE_NAME = "numCallsInReplicationQueue";
String REPLICATION_QUEUE_DESC = String REPLICATION_QUEUE_DESC =
"Number of calls in the replication call queue."; "Number of calls in the replication call queue waiting to be run";
String PRIORITY_QUEUE_DESC = "Number of calls in the priority call queue."; String PRIORITY_QUEUE_DESC = "Number of calls in the priority call queue waiting to be run";
String NUM_OPEN_CONNECTIONS_NAME = "numOpenConnections"; String NUM_OPEN_CONNECTIONS_NAME = "numOpenConnections";
String NUM_OPEN_CONNECTIONS_DESC = "Number of open connections."; String NUM_OPEN_CONNECTIONS_DESC = "Number of open connections.";
String NUM_ACTIVE_HANDLER_NAME = "numActiveHandler"; String NUM_ACTIVE_HANDLER_NAME = "numActiveHandler";

View File

@ -36,7 +36,7 @@ public class MetricsHBaseServerWrapperImpl implements MetricsHBaseServerWrapper
if (!isServerStarted()) { if (!isServerStarted()) {
return 0; return 0;
} }
return server.callQueueSize.get(); return server.callQueueSizeInBytes.get();
} }
@Override @Override
@ -65,10 +65,10 @@ public class MetricsHBaseServerWrapperImpl implements MetricsHBaseServerWrapper
@Override @Override
public int getNumOpenConnections() { public int getNumOpenConnections() {
if (!isServerStarted() || this.server.connectionList == null) { if (!isServerStarted()) {
return 0; return 0;
} }
return server.connectionList.size(); return server.getNumOpenConnections();
} }
@Override @Override

View File

@ -48,15 +48,17 @@ import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
@ -113,6 +115,7 @@ import org.apache.hadoop.hbase.util.Counter;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.WritableUtils;
@ -183,11 +186,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
*/ */
static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10; static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10;
/**
* The maximum size that we can hold in the RPC queue
*/
private static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024;
private final IPCUtil ipcUtil; private final IPCUtil ipcUtil;
private static final String AUTH_FAILED_FOR = "Auth failed for "; private static final String AUTH_FAILED_FOR = "Auth failed for ";
@ -210,22 +208,30 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
protected int port; // port we listen on protected int port; // port we listen on
protected InetSocketAddress address; // inet address we listen on protected InetSocketAddress address; // inet address we listen on
private int readThreads; // number of read threads private int readThreads; // number of read threads
protected int maxIdleTime; // the maximum idle time after
// which a client may be
// disconnected
protected int thresholdIdleConnections; // the number of idle
// connections after which we
// will start cleaning up idle
// connections
int maxConnectionsToNuke; // the max number of
// connections to nuke
// during a cleanup
protected MetricsHBaseServer metrics; protected MetricsHBaseServer metrics;
protected final Configuration conf; protected final Configuration conf;
private int maxQueueSize; /**
* Maximum size in bytes of the currently queued and running Calls. If a new Call puts us over
* this size, then we will reject the call (after parsing it though). It will go back to the
* client and client will retry. Set this size with "hbase.ipc.server.max.callqueue.size". The
* call queue size gets incremented after we parse a call and before we add it to the queue of
* calls for the scheduler to use. It get decremented after we have 'run' the Call. The current
* size is kept in {@link #callQueueSizeInBytes}.
* @see {@link #callQueueSizeInBytes}
* @see {@link #DEFAULT_MAX_CALLQUEUE_SIZE}
* @see {@link #callQueueSizeInBytes}
*/
private final long maxQueueSizeInBytes;
private static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024;
/**
* This is a running count of the size in bytes of all outstanding calls whether currently
* executing or queued waiting to be run.
*/
protected final Counter callQueueSizeInBytes = new Counter();
protected int socketSendBufferSize; protected int socketSendBufferSize;
protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
protected final boolean tcpKeepAlive; // if T then use keepalives protected final boolean tcpKeepAlive; // if T then use keepalives
@ -244,19 +250,11 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
*/ */
volatile boolean started = false; volatile boolean started = false;
/** // maintains the set of client connections and handles idle timeouts
* This is a running count of the size of all outstanding calls by size. private ConnectionManager connectionManager;
*/
protected final Counter callQueueSize = new Counter();
protected final List<Connection> connectionList =
Collections.synchronizedList(new LinkedList<Connection>());
//maintain a list
//of client connections
private Listener listener = null; private Listener listener = null;
protected Responder responder = null; protected Responder responder = null;
protected AuthenticationTokenSecretManager authTokenSecretMgr = null; protected AuthenticationTokenSecretManager authTokenSecretMgr = null;
protected int numConnections = 0;
protected HBaseRPCErrorHandler errorHandler = null; protected HBaseRPCErrorHandler errorHandler = null;
@ -623,18 +621,16 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
private Selector selector = null; //the selector that we use for the server private Selector selector = null; //the selector that we use for the server
private Reader[] readers = null; private Reader[] readers = null;
private int currentReader = 0; private int currentReader = 0;
private Random rand = new Random(); private final int readerPendingConnectionQueueLength;
private long lastCleanupRunTime = 0; //the last time when a cleanup connec-
//-tion (for idle connections) ran
private long cleanupInterval = 10000; //the minimum interval between
//two cleanup runs
private int backlogLength;
private ExecutorService readPool; private ExecutorService readPool;
public Listener(final String name) throws IOException { public Listener(final String name) throws IOException {
super(name); super(name);
backlogLength = conf.getInt("hbase.ipc.server.listen.queue.size", 128); // The backlog of requests that we will have the serversocket carry.
int backlogLength = conf.getInt("hbase.ipc.server.listen.queue.size", 128);
readerPendingConnectionQueueLength =
conf.getInt("hbase.ipc.server.read.connection-queue.size", 100);
// Create a new server socket and set to non blocking mode // Create a new server socket and set to non blocking mode
acceptChannel = ServerSocketChannel.open(); acceptChannel = ServerSocketChannel.open();
acceptChannel.configureBlocking(false); acceptChannel.configureBlocking(false);
@ -644,9 +640,11 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
address = (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress(); address = (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress();
// create a selector; // create a selector;
selector= Selector.open(); selector = Selector.open();
readers = new Reader[readThreads]; readers = new Reader[readThreads];
// Why this executor thing? Why not like hadoop just start up all the threads? I suppose it
// has an advantage in that it is easy to shutdown the pool.
readPool = Executors.newFixedThreadPool(readThreads, readPool = Executors.newFixedThreadPool(readThreads,
new ThreadFactoryBuilder().setNameFormat( new ThreadFactoryBuilder().setNameFormat(
"RpcServer.reader=%d,bindAddress=" + bindAddress.getHostName() + "RpcServer.reader=%d,bindAddress=" + bindAddress.getHostName() +
@ -667,12 +665,15 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
private class Reader implements Runnable { private class Reader implements Runnable {
private volatile boolean adding = false; final private LinkedBlockingQueue<Connection> pendingConnections;
private final Selector readSelector; private final Selector readSelector;
Reader() throws IOException { Reader() throws IOException {
this.pendingConnections =
new LinkedBlockingQueue<Connection>(readerPendingConnectionQueueLength);
this.readSelector = Selector.open(); this.readSelector = Selector.open();
} }
@Override @Override
public void run() { public void run() {
try { try {
@ -689,11 +690,14 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
private synchronized void doRunLoop() { private synchronized void doRunLoop() {
while (running) { while (running) {
try { try {
readSelector.select(); // Consume as many connections as currently queued to avoid
while (adding) { // unbridled acceptance of connections that starves the select
this.wait(1000); int size = pendingConnections.size();
for (int i=size; i>0; i--) {
Connection conn = pendingConnections.take();
conn.channel.register(readSelector, SelectionKey.OP_READ, conn);
} }
readSelector.select();
Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator(); Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
while (iter.hasNext()) { while (iter.hasNext()) {
SelectionKey key = iter.next(); SelectionKey key = iter.next();
@ -703,9 +707,12 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
doRead(key); doRead(key);
} }
} }
key = null;
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.debug("Interrupted while sleeping"); if (running) { // unexpected -- log it
LOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e);
}
return; return;
} catch (IOException ex) { } catch (IOException ex) {
LOG.info(getName() + ": IOException in Reader", ex); LOG.info(getName() + ": IOException in Reader", ex);
@ -714,76 +721,14 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
} }
/** /**
* This gets reader into the state that waits for the new channel * Updating the readSelector while it's being used is not thread-safe,
* to be registered with readSelector. If it was waiting in select() * so the connection must be queued. The reader will drain the queue
* the thread will be woken up, otherwise whenever select() is called * and update its readSelector before performing the next select
* it will return even if there is nothing to read and wait
* in while(adding) for finishAdd call
*/ */
public void startAdd() { public void addConnection(Connection conn) throws IOException {
adding = true; pendingConnections.add(conn);
readSelector.wakeup(); readSelector.wakeup();
} }
public synchronized SelectionKey registerChannel(SocketChannel channel)
throws IOException {
return channel.register(readSelector, SelectionKey.OP_READ);
}
public synchronized void finishAdd() {
adding = false;
this.notify();
}
}
/** cleanup connections from connectionList. Choose a random range
* to scan and also have a limit on the number of the connections
* that will be cleanedup per run. The criteria for cleanup is the time
* for which the connection was idle. If 'force' is true then all
* connections will be looked at for the cleanup.
* @param force all connections will be looked at for cleanup
*/
private void cleanupConnections(boolean force) {
if (force || numConnections > thresholdIdleConnections) {
long currentTime = System.currentTimeMillis();
if (!force && (currentTime - lastCleanupRunTime) < cleanupInterval) {
return;
}
int start = 0;
int end = numConnections - 1;
if (!force) {
start = rand.nextInt() % numConnections;
end = rand.nextInt() % numConnections;
int temp;
if (end < start) {
temp = start;
start = end;
end = temp;
}
}
int i = start;
int numNuked = 0;
while (i <= end) {
Connection c;
synchronized (connectionList) {
try {
c = connectionList.get(i);
} catch (Exception e) {return;}
}
if (c.timedOut(currentTime)) {
if (LOG.isDebugEnabled())
LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
closeConnection(c);
numNuked++;
end--;
//noinspection UnusedAssignment
c = null;
if (!force && numNuked == maxConnectionsToNuke) break;
}
else i++;
}
lastCleanupRunTime = System.currentTimeMillis();
}
} }
@Override @Override
@ -792,6 +737,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
"it will have per impact") "it will have per impact")
public void run() { public void run() {
LOG.info(getName() + ": starting"); LOG.info(getName() + ": starting");
connectionManager.startIdleScan();
while (running) { while (running) {
SelectionKey key = null; SelectionKey key = null;
try { try {
@ -815,7 +761,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
if (errorHandler.checkOOME(e)) { if (errorHandler.checkOOME(e)) {
LOG.info(getName() + ": exiting on OutOfMemoryError"); LOG.info(getName() + ": exiting on OutOfMemoryError");
closeCurrentConnection(key, e); closeCurrentConnection(key, e);
cleanupConnections(true); connectionManager.closeIdle(true);
return; return;
} }
} else { } else {
@ -824,22 +770,18 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
// some thread(s) a chance to finish // some thread(s) a chance to finish
LOG.warn(getName() + ": OutOfMemoryError in server select", e); LOG.warn(getName() + ": OutOfMemoryError in server select", e);
closeCurrentConnection(key, e); closeCurrentConnection(key, e);
cleanupConnections(true); connectionManager.closeIdle(true);
try { try {
Thread.sleep(60000); Thread.sleep(60000);
} catch (InterruptedException ex) { } catch (InterruptedException ex) {
LOG.debug("Interrupted while sleeping"); LOG.debug("Interrupted while sleeping");
return;
} }
} }
} catch (Exception e) { } catch (Exception e) {
closeCurrentConnection(key, e); closeCurrentConnection(key, e);
} }
cleanupConnections(false);
} }
LOG.info(getName() + ": stopping"); LOG.info(getName() + ": stopping");
synchronized (this) { synchronized (this) {
try { try {
acceptChannel.close(); acceptChannel.close();
@ -851,10 +793,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
selector= null; selector= null;
acceptChannel= null; acceptChannel= null;
// clean up all connections // close all connections
while (!connectionList.isEmpty()) { connectionManager.stopIdleScan();
closeConnection(connectionList.remove(0)); connectionManager.closeAll();
}
} }
} }
@ -862,10 +803,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
if (key != null) { if (key != null) {
Connection c = (Connection)key.attachment(); Connection c = (Connection)key.attachment();
if (c != null) { if (c != null) {
if (LOG.isDebugEnabled()) {
LOG.debug(getName() + ": disconnecting client " + c.getHostAddress() +
(e != null ? " on error " + e.getMessage() : ""));
}
closeConnection(c); closeConnection(c);
key.attach(null); key.attach(null);
} }
@ -876,37 +813,24 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
return address; return address;
} }
void doAccept(SelectionKey key) throws IOException, OutOfMemoryError { void doAccept(SelectionKey key) throws InterruptedException, IOException, OutOfMemoryError {
Connection c;
ServerSocketChannel server = (ServerSocketChannel) key.channel(); ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel; SocketChannel channel;
while ((channel = server.accept()) != null) { while ((channel = server.accept()) != null) {
try { channel.configureBlocking(false);
channel.configureBlocking(false); channel.socket().setTcpNoDelay(tcpNoDelay);
channel.socket().setTcpNoDelay(tcpNoDelay); channel.socket().setKeepAlive(tcpKeepAlive);
channel.socket().setKeepAlive(tcpKeepAlive);
} catch (IOException ioe) {
channel.close();
throw ioe;
}
Reader reader = getReader(); Reader reader = getReader();
try { Connection c = connectionManager.register(channel);
reader.startAdd(); // If the connectionManager can't take it, close the connection.
SelectionKey readKey = reader.registerChannel(channel); if (c == null) {
c = getConnection(channel, System.currentTimeMillis()); if (channel.isOpen()) {
readKey.attach(c); IOUtils.cleanup(null, channel);
synchronized (connectionList) {
connectionList.add(numConnections, c);
numConnections++;
} }
if (LOG.isDebugEnabled()) continue;
LOG.debug(getName() + ": connection from " + c.toString() +
"; # active connections: " + numConnections);
} finally {
reader.finishAdd();
} }
key.attach(c); // so closeCurrentConnection can get the object
reader.addConnection(c);
} }
} }
@ -919,12 +843,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
c.setLastContact(System.currentTimeMillis()); c.setLastContact(System.currentTimeMillis());
try { try {
count = c.readAndProcess(); count = c.readAndProcess();
if (count > 0) {
c.setLastContact(System.currentTimeMillis());
}
} catch (InterruptedException ieo) { } catch (InterruptedException ieo) {
LOG.info(Thread.currentThread().getName() + ": readAndProcess caught InterruptedException", ieo);
throw ieo; throw ieo;
} catch (Exception e) { } catch (Exception e) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -933,12 +853,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
count = -1; //so that the (count < 0) block is executed count = -1; //so that the (count < 0) block is executed
} }
if (count < 0) { if (count < 0) {
if (LOG.isDebugEnabled()) {
LOG.debug(getName() + ": DISCONNECTING client " + c.toString() +
" because read count=" + count +
". Number of active connections: " + numConnections);
}
closeConnection(c); closeConnection(c);
c = null;
} else {
c.setLastContact(System.currentTimeMillis());
} }
} }
@ -957,6 +875,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
readPool.shutdownNow(); readPool.shutdownNow();
} }
synchronized Selector getSelector() { return selector; }
// The method that will return the next reader to work with // The method that will return the next reader to work with
// Simplistic implementation of round robin for now // Simplistic implementation of round robin for now
Reader getReader() { Reader getReader() {
@ -1355,6 +1275,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
return null; return null;
} }
public long getLastContact() {
return lastContact;
}
/* Return true if the connection has no outstanding rpc */ /* Return true if the connection has no outstanding rpc */
private boolean isIdle() { private boolean isIdle() {
return rpcCount.get() == 0; return rpcCount.get() == 0;
@ -1370,10 +1294,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
rpcCount.increment(); rpcCount.increment();
} }
protected boolean timedOut(long currentTime) {
return isIdle() && currentTime - lastContact > maxIdleTime;
}
private UserGroupInformation getAuthorizedUgi(String authorizedId) private UserGroupInformation getAuthorizedUgi(String authorizedId)
throws IOException { throws IOException {
UserGroupInformation authorizedUgi; UserGroupInformation authorizedUgi;
@ -1883,7 +1803,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
} }
// Enforcing the call queue size, this triggers a retry in the client // Enforcing the call queue size, this triggers a retry in the client
// This is a bit late to be doing this check - we have already read in the total request. // This is a bit late to be doing this check - we have already read in the total request.
if ((totalRequestSize + callQueueSize.get()) > maxQueueSize) { if ((totalRequestSize + callQueueSizeInBytes.get()) > maxQueueSizeInBytes) {
final Call callTooBig = final Call callTooBig =
new Call(id, this.service, null, null, null, null, this, new Call(id, this.service, null, null, null, null, this,
responder, totalRequestSize, null, null, 0); responder, totalRequestSize, null, null, 0);
@ -1954,7 +1874,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
totalRequestSize, traceInfo, this.addr, timeout); totalRequestSize, traceInfo, this.addr, timeout);
if (!scheduler.dispatch(new CallRunner(RpcServer.this, call))) { if (!scheduler.dispatch(new CallRunner(RpcServer.this, call))) {
callQueueSize.add(-1 * call.getSize()); callQueueSizeInBytes.add(-1 * call.getSize());
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION); metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
@ -2093,12 +2013,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
this.bindAddress = bindAddress; this.bindAddress = bindAddress;
this.conf = conf; this.conf = conf;
this.socketSendBufferSize = 0; this.socketSendBufferSize = 0;
this.maxQueueSize = // See declaration above for documentation on what this size is.
this.conf.getInt("hbase.ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE); this.maxQueueSizeInBytes =
this.conf.getLong("hbase.ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE);
this.readThreads = conf.getInt("hbase.ipc.server.read.threadpool.size", 10); this.readThreads = conf.getInt("hbase.ipc.server.read.threadpool.size", 10);
this.maxIdleTime = 2 * conf.getInt("hbase.ipc.client.connection.maxidletime", 1000);
this.maxConnectionsToNuke = conf.getInt("hbase.ipc.client.kill.max", 10);
this.thresholdIdleConnections = conf.getInt("hbase.ipc.client.idlethreshold", 4000);
this.purgeTimeout = conf.getLong("hbase.ipc.client.call.purge.timeout", this.purgeTimeout = conf.getLong("hbase.ipc.client.call.purge.timeout",
2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT); 2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME); this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME);
@ -2120,6 +2038,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
// Create the responder here // Create the responder here
responder = new Responder(); responder = new Responder();
connectionManager = new ConnectionManager();
this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false); this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);
this.userProvider = UserProvider.instantiate(conf); this.userProvider = UserProvider.instantiate(conf);
this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled(); this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled();
@ -2177,12 +2096,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
} }
protected void closeConnection(Connection connection) { protected void closeConnection(Connection connection) {
synchronized (connectionList) { connectionManager.close(connection);
if (connectionList.remove(connection)) {
numConnections--;
}
}
connection.close();
} }
Configuration getConf() { Configuration getConf() {
@ -2440,7 +2354,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
@Override @Override
public void addCallSize(final long diff) { public void addCallSize(final long diff) {
this.callQueueSize.add(diff); this.callQueueSizeInBytes.add(diff);
} }
/** /**
@ -2577,6 +2491,14 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
return ctx == null? null: ctx.getRequestUser(); return ctx == null? null: ctx.getRequestUser();
} }
/**
* The number of open RPC conections
* @return the number of open rpc connections
*/
public int getNumOpenConnections() {
return connectionManager.size();
}
/** /**
* Returns the username for any user associated with the current RPC * Returns the username for any user associated with the current RPC
* request or <code>null</code> if no user is set. * request or <code>null</code> if no user is set.
@ -2695,4 +2617,149 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
public RpcScheduler getScheduler() { public RpcScheduler getScheduler() {
return scheduler; return scheduler;
} }
private class ConnectionManager {
final private AtomicInteger count = new AtomicInteger();
final private Set<Connection> connections;
final private Timer idleScanTimer;
final private int idleScanThreshold;
final private int idleScanInterval;
final private int maxIdleTime;
final private int maxIdleToClose;
ConnectionManager() {
this.idleScanTimer = new Timer("RpcServer idle connection scanner for port " + port, true);
this.idleScanThreshold = conf.getInt("hbase.ipc.client.idlethreshold", 4000);
this.idleScanInterval =
conf.getInt("hbase.ipc.client.connection.idle-scan-interval.ms", 10000);
this.maxIdleTime = 2 * conf.getInt("hbase.ipc.client.connection.maxidletime", 10000);
this.maxIdleToClose = conf.getInt("hbase.ipc.client.kill.max", 10);
int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
int maxConnectionQueueSize =
handlerCount * conf.getInt("hbase.ipc.server.handler.queue.size", 100);
// create a set with concurrency -and- a thread-safe iterator, add 2
// for listener and idle closer threads
this.connections = Collections.newSetFromMap(
new ConcurrentHashMap<Connection,Boolean>(
maxConnectionQueueSize, 0.75f, readThreads+2));
}
private boolean add(Connection connection) {
boolean added = connections.add(connection);
if (added) {
count.getAndIncrement();
}
return added;
}
private boolean remove(Connection connection) {
boolean removed = connections.remove(connection);
if (removed) {
count.getAndDecrement();
}
return removed;
}
int size() {
return count.get();
}
Connection[] toArray() {
return connections.toArray(new Connection[0]);
}
Connection register(SocketChannel channel) {
Connection connection = new Connection(channel, System.currentTimeMillis());
add(connection);
if (LOG.isDebugEnabled()) {
LOG.debug("Server connection from " + connection +
"; connections=" + size() +
", queued calls size (bytes)=" + callQueueSizeInBytes.get() +
", general queued calls=" + scheduler.getGeneralQueueLength() +
", priority queued calls=" + scheduler.getPriorityQueueLength());
}
return connection;
}
boolean close(Connection connection) {
boolean exists = remove(connection);
if (exists) {
if (LOG.isDebugEnabled()) {
LOG.debug(Thread.currentThread().getName() +
": disconnecting client " + connection +
". Number of active connections: "+ size());
}
// only close if actually removed to avoid double-closing due
// to possible races
connection.close();
}
return exists;
}
// synch'ed to avoid explicit invocation upon OOM from colliding with
// timer task firing
synchronized void closeIdle(boolean scanAll) {
long minLastContact = System.currentTimeMillis() - maxIdleTime;
// concurrent iterator might miss new connections added
// during the iteration, but that's ok because they won't
// be idle yet anyway and will be caught on next scan
int closed = 0;
for (Connection connection : connections) {
// stop if connections dropped below threshold unless scanning all
if (!scanAll && size() < idleScanThreshold) {
break;
}
// stop if not scanning all and max connections are closed
if (connection.isIdle() &&
connection.getLastContact() < minLastContact &&
close(connection) &&
!scanAll && (++closed == maxIdleToClose)) {
break;
}
}
}
void closeAll() {
// use a copy of the connections to be absolutely sure the concurrent
// iterator doesn't miss a connection
for (Connection connection : toArray()) {
close(connection);
}
}
void startIdleScan() {
scheduleIdleScanTask();
}
void stopIdleScan() {
idleScanTimer.cancel();
}
private void scheduleIdleScanTask() {
if (!running) {
return;
}
TimerTask idleScanTask = new TimerTask(){
@Override
public void run() {
if (!running) {
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug(Thread.currentThread().getName()+": task running");
}
try {
closeIdle(false);
} finally {
// explicitly reschedule so next execution occurs relative
// to the end of this scan, not the beginning
scheduleIdleScanTask();
}
}
};
idleScanTimer.schedule(idleScanTask, idleScanInterval);
}
}
} }

View File

@ -41,7 +41,7 @@ public class SimpleRpcSchedulerFactory implements RpcSchedulerFactory {
@Override @Override
public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) { public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) {
int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT); HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
return new SimpleRpcScheduler( return new SimpleRpcScheduler(
conf, conf,

View File

@ -263,7 +263,7 @@ public abstract class AbstractTestIPC {
fail("Expected an exception to have been thrown!"); fail("Expected an exception to have been thrown!");
} catch (Exception e) { } catch (Exception e) {
LOG.info("Caught expected exception: " + e.toString()); LOG.info("Caught expected exception: " + e.toString());
assertTrue(StringUtils.stringifyException(e).contains("Injected fault")); assertTrue(e.toString(), StringUtils.stringifyException(e).contains("Injected fault"));
} finally { } finally {
rpcServer.stop(); rpcServer.stop();
} }