diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java index bb897896a01..ce57e0fe3f9 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java @@ -52,14 +52,16 @@ public interface MetricsHBaseServerSource extends BaseSource { String TOTAL_CALL_TIME_NAME = "totalCallTime"; String TOTAL_CALL_TIME_DESC = "Total call time, including both queued and processing time."; String QUEUE_SIZE_NAME = "queueSize"; - String QUEUE_SIZE_DESC = "Number of bytes in the call queues."; + String QUEUE_SIZE_DESC = "Number of bytes in the call queues; request has been read and " + + "parsed and is waiting to run or is currently being executed."; String GENERAL_QUEUE_NAME = "numCallsInGeneralQueue"; - String GENERAL_QUEUE_DESC = "Number of calls in the general call queue."; + String GENERAL_QUEUE_DESC = "Number of calls in the general call queue; " + + "parsed requests waiting in scheduler to be executed"; String PRIORITY_QUEUE_NAME = "numCallsInPriorityQueue"; String REPLICATION_QUEUE_NAME = "numCallsInReplicationQueue"; String REPLICATION_QUEUE_DESC = - "Number of calls in the replication call queue."; - String PRIORITY_QUEUE_DESC = "Number of calls in the priority call queue."; + "Number of calls in the replication call queue waiting to be run"; + String PRIORITY_QUEUE_DESC = "Number of calls in the priority call queue waiting to be run"; String NUM_OPEN_CONNECTIONS_NAME = "numOpenConnections"; String NUM_OPEN_CONNECTIONS_DESC = "Number of open connections."; String NUM_ACTIVE_HANDLER_NAME = "numActiveHandler"; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java index 9979c75aebf..4f537091a7d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java @@ -36,7 +36,7 @@ public class MetricsHBaseServerWrapperImpl implements MetricsHBaseServerWrapper if (!isServerStarted()) { return 0; } - return server.callQueueSize.get(); + return server.callQueueSizeInBytes.get(); } @Override @@ -65,10 +65,10 @@ public class MetricsHBaseServerWrapperImpl implements MetricsHBaseServerWrapper @Override public int getNumOpenConnections() { - if (!isServerStarted() || this.server.connectionList == null) { + if (!isServerStarted()) { return 0; } - return server.connectionList.size(); + return server.getNumOpenConnections(); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 483ce86ef51..1087c42cfa3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -48,15 +48,17 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Random; import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -113,6 +115,7 @@ import org.apache.hadoop.hbase.util.Counter; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; @@ -183,11 +186,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { */ static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10; - /** - * The maximum size that we can hold in the RPC queue - */ - private static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024; - private final IPCUtil ipcUtil; private static final String AUTH_FAILED_FOR = "Auth failed for "; @@ -210,22 +208,30 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { protected int port; // port we listen on protected InetSocketAddress address; // inet address we listen on private int readThreads; // number of read threads - protected int maxIdleTime; // the maximum idle time after - // which a client may be - // disconnected - protected int thresholdIdleConnections; // the number of idle - // connections after which we - // will start cleaning up idle - // connections - int maxConnectionsToNuke; // the max number of - // connections to nuke - // during a cleanup - protected MetricsHBaseServer metrics; protected final Configuration conf; - private int maxQueueSize; + /** + * Maximum size in bytes of the currently queued and running Calls. If a new Call puts us over + * this size, then we will reject the call (after parsing it though). It will go back to the + * client and client will retry. Set this size with "hbase.ipc.server.max.callqueue.size". The + * call queue size gets incremented after we parse a call and before we add it to the queue of + * calls for the scheduler to use. It get decremented after we have 'run' the Call. The current + * size is kept in {@link #callQueueSizeInBytes}. + * @see {@link #callQueueSizeInBytes} + * @see {@link #DEFAULT_MAX_CALLQUEUE_SIZE} + * @see {@link #callQueueSizeInBytes} + */ + private final long maxQueueSizeInBytes; + private static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024; + + /** + * This is a running count of the size in bytes of all outstanding calls whether currently + * executing or queued waiting to be run. + */ + protected final Counter callQueueSizeInBytes = new Counter(); + protected int socketSendBufferSize; protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm protected final boolean tcpKeepAlive; // if T then use keepalives @@ -244,19 +250,11 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { */ volatile boolean started = false; - /** - * This is a running count of the size of all outstanding calls by size. - */ - protected final Counter callQueueSize = new Counter(); - - protected final List connectionList = - Collections.synchronizedList(new LinkedList()); - //maintain a list - //of client connections + // maintains the set of client connections and handles idle timeouts + private ConnectionManager connectionManager; private Listener listener = null; protected Responder responder = null; protected AuthenticationTokenSecretManager authTokenSecretMgr = null; - protected int numConnections = 0; protected HBaseRPCErrorHandler errorHandler = null; @@ -623,18 +621,16 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { private Selector selector = null; //the selector that we use for the server private Reader[] readers = null; private int currentReader = 0; - private Random rand = new Random(); - private long lastCleanupRunTime = 0; //the last time when a cleanup connec- - //-tion (for idle connections) ran - private long cleanupInterval = 10000; //the minimum interval between - //two cleanup runs - private int backlogLength; + private final int readerPendingConnectionQueueLength; private ExecutorService readPool; public Listener(final String name) throws IOException { super(name); - backlogLength = conf.getInt("hbase.ipc.server.listen.queue.size", 128); + // The backlog of requests that we will have the serversocket carry. + int backlogLength = conf.getInt("hbase.ipc.server.listen.queue.size", 128); + readerPendingConnectionQueueLength = + conf.getInt("hbase.ipc.server.read.connection-queue.size", 100); // Create a new server socket and set to non blocking mode acceptChannel = ServerSocketChannel.open(); acceptChannel.configureBlocking(false); @@ -644,9 +640,11 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port address = (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress(); // create a selector; - selector= Selector.open(); + selector = Selector.open(); readers = new Reader[readThreads]; + // Why this executor thing? Why not like hadoop just start up all the threads? I suppose it + // has an advantage in that it is easy to shutdown the pool. readPool = Executors.newFixedThreadPool(readThreads, new ThreadFactoryBuilder().setNameFormat( "RpcServer.reader=%d,bindAddress=" + bindAddress.getHostName() + @@ -667,12 +665,15 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { private class Reader implements Runnable { - private volatile boolean adding = false; + final private LinkedBlockingQueue pendingConnections; private final Selector readSelector; Reader() throws IOException { + this.pendingConnections = + new LinkedBlockingQueue(readerPendingConnectionQueueLength); this.readSelector = Selector.open(); } + @Override public void run() { try { @@ -689,11 +690,14 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { private synchronized void doRunLoop() { while (running) { try { - readSelector.select(); - while (adding) { - this.wait(1000); + // Consume as many connections as currently queued to avoid + // unbridled acceptance of connections that starves the select + int size = pendingConnections.size(); + for (int i=size; i>0; i--) { + Connection conn = pendingConnections.take(); + conn.channel.register(readSelector, SelectionKey.OP_READ, conn); } - + readSelector.select(); Iterator iter = readSelector.selectedKeys().iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); @@ -703,9 +707,12 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { doRead(key); } } + key = null; } } catch (InterruptedException e) { - LOG.debug("Interrupted while sleeping"); + if (running) { // unexpected -- log it + LOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e); + } return; } catch (IOException ex) { LOG.info(getName() + ": IOException in Reader", ex); @@ -714,76 +721,14 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } /** - * This gets reader into the state that waits for the new channel - * to be registered with readSelector. If it was waiting in select() - * the thread will be woken up, otherwise whenever select() is called - * it will return even if there is nothing to read and wait - * in while(adding) for finishAdd call + * Updating the readSelector while it's being used is not thread-safe, + * so the connection must be queued. The reader will drain the queue + * and update its readSelector before performing the next select */ - public void startAdd() { - adding = true; + public void addConnection(Connection conn) throws IOException { + pendingConnections.add(conn); readSelector.wakeup(); } - - public synchronized SelectionKey registerChannel(SocketChannel channel) - throws IOException { - return channel.register(readSelector, SelectionKey.OP_READ); - } - - public synchronized void finishAdd() { - adding = false; - this.notify(); - } - } - - /** cleanup connections from connectionList. Choose a random range - * to scan and also have a limit on the number of the connections - * that will be cleanedup per run. The criteria for cleanup is the time - * for which the connection was idle. If 'force' is true then all - * connections will be looked at for the cleanup. - * @param force all connections will be looked at for cleanup - */ - private void cleanupConnections(boolean force) { - if (force || numConnections > thresholdIdleConnections) { - long currentTime = System.currentTimeMillis(); - if (!force && (currentTime - lastCleanupRunTime) < cleanupInterval) { - return; - } - int start = 0; - int end = numConnections - 1; - if (!force) { - start = rand.nextInt() % numConnections; - end = rand.nextInt() % numConnections; - int temp; - if (end < start) { - temp = start; - start = end; - end = temp; - } - } - int i = start; - int numNuked = 0; - while (i <= end) { - Connection c; - synchronized (connectionList) { - try { - c = connectionList.get(i); - } catch (Exception e) {return;} - } - if (c.timedOut(currentTime)) { - if (LOG.isDebugEnabled()) - LOG.debug(getName() + ": disconnecting client " + c.getHostAddress()); - closeConnection(c); - numNuked++; - end--; - //noinspection UnusedAssignment - c = null; - if (!force && numNuked == maxConnectionsToNuke) break; - } - else i++; - } - lastCleanupRunTime = System.currentTimeMillis(); - } } @Override @@ -792,6 +737,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { "it will have per impact") public void run() { LOG.info(getName() + ": starting"); + connectionManager.startIdleScan(); while (running) { SelectionKey key = null; try { @@ -815,7 +761,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { if (errorHandler.checkOOME(e)) { LOG.info(getName() + ": exiting on OutOfMemoryError"); closeCurrentConnection(key, e); - cleanupConnections(true); + connectionManager.closeIdle(true); return; } } else { @@ -824,22 +770,18 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { // some thread(s) a chance to finish LOG.warn(getName() + ": OutOfMemoryError in server select", e); closeCurrentConnection(key, e); - cleanupConnections(true); + connectionManager.closeIdle(true); try { Thread.sleep(60000); } catch (InterruptedException ex) { LOG.debug("Interrupted while sleeping"); - return; } } } catch (Exception e) { closeCurrentConnection(key, e); } - cleanupConnections(false); } - LOG.info(getName() + ": stopping"); - synchronized (this) { try { acceptChannel.close(); @@ -851,10 +793,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { selector= null; acceptChannel= null; - // clean up all connections - while (!connectionList.isEmpty()) { - closeConnection(connectionList.remove(0)); - } + // close all connections + connectionManager.stopIdleScan(); + connectionManager.closeAll(); } } @@ -862,10 +803,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { if (key != null) { Connection c = (Connection)key.attachment(); if (c != null) { - if (LOG.isDebugEnabled()) { - LOG.debug(getName() + ": disconnecting client " + c.getHostAddress() + - (e != null ? " on error " + e.getMessage() : "")); - } closeConnection(c); key.attach(null); } @@ -876,37 +813,24 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { return address; } - void doAccept(SelectionKey key) throws IOException, OutOfMemoryError { - Connection c; + void doAccept(SelectionKey key) throws InterruptedException, IOException, OutOfMemoryError { ServerSocketChannel server = (ServerSocketChannel) key.channel(); - SocketChannel channel; while ((channel = server.accept()) != null) { - try { - channel.configureBlocking(false); - channel.socket().setTcpNoDelay(tcpNoDelay); - channel.socket().setKeepAlive(tcpKeepAlive); - } catch (IOException ioe) { - channel.close(); - throw ioe; - } - + channel.configureBlocking(false); + channel.socket().setTcpNoDelay(tcpNoDelay); + channel.socket().setKeepAlive(tcpKeepAlive); Reader reader = getReader(); - try { - reader.startAdd(); - SelectionKey readKey = reader.registerChannel(channel); - c = getConnection(channel, System.currentTimeMillis()); - readKey.attach(c); - synchronized (connectionList) { - connectionList.add(numConnections, c); - numConnections++; + Connection c = connectionManager.register(channel); + // If the connectionManager can't take it, close the connection. + if (c == null) { + if (channel.isOpen()) { + IOUtils.cleanup(null, channel); } - if (LOG.isDebugEnabled()) - LOG.debug(getName() + ": connection from " + c.toString() + - "; # active connections: " + numConnections); - } finally { - reader.finishAdd(); + continue; } + key.attach(c); // so closeCurrentConnection can get the object + reader.addConnection(c); } } @@ -919,12 +843,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { c.setLastContact(System.currentTimeMillis()); try { count = c.readAndProcess(); - - if (count > 0) { - c.setLastContact(System.currentTimeMillis()); - } - } catch (InterruptedException ieo) { + LOG.info(Thread.currentThread().getName() + ": readAndProcess caught InterruptedException", ieo); throw ieo; } catch (Exception e) { if (LOG.isDebugEnabled()) { @@ -933,12 +853,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { count = -1; //so that the (count < 0) block is executed } if (count < 0) { - if (LOG.isDebugEnabled()) { - LOG.debug(getName() + ": DISCONNECTING client " + c.toString() + - " because read count=" + count + - ". Number of active connections: " + numConnections); - } closeConnection(c); + c = null; + } else { + c.setLastContact(System.currentTimeMillis()); } } @@ -957,6 +875,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { readPool.shutdownNow(); } + synchronized Selector getSelector() { return selector; } + // The method that will return the next reader to work with // Simplistic implementation of round robin for now Reader getReader() { @@ -1355,6 +1275,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { return null; } + public long getLastContact() { + return lastContact; + } + /* Return true if the connection has no outstanding rpc */ private boolean isIdle() { return rpcCount.get() == 0; @@ -1370,10 +1294,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { rpcCount.increment(); } - protected boolean timedOut(long currentTime) { - return isIdle() && currentTime - lastContact > maxIdleTime; - } - private UserGroupInformation getAuthorizedUgi(String authorizedId) throws IOException { UserGroupInformation authorizedUgi; @@ -1883,7 +1803,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } // Enforcing the call queue size, this triggers a retry in the client // This is a bit late to be doing this check - we have already read in the total request. - if ((totalRequestSize + callQueueSize.get()) > maxQueueSize) { + if ((totalRequestSize + callQueueSizeInBytes.get()) > maxQueueSizeInBytes) { final Call callTooBig = new Call(id, this.service, null, null, null, null, this, responder, totalRequestSize, null, null, 0); @@ -1954,7 +1874,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { totalRequestSize, traceInfo, this.addr, timeout); if (!scheduler.dispatch(new CallRunner(RpcServer.this, call))) { - callQueueSize.add(-1 * call.getSize()); + callQueueSizeInBytes.add(-1 * call.getSize()); ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION); @@ -2093,12 +2013,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { this.bindAddress = bindAddress; this.conf = conf; this.socketSendBufferSize = 0; - this.maxQueueSize = - this.conf.getInt("hbase.ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE); + // See declaration above for documentation on what this size is. + this.maxQueueSizeInBytes = + this.conf.getLong("hbase.ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE); this.readThreads = conf.getInt("hbase.ipc.server.read.threadpool.size", 10); - this.maxIdleTime = 2 * conf.getInt("hbase.ipc.client.connection.maxidletime", 1000); - this.maxConnectionsToNuke = conf.getInt("hbase.ipc.client.kill.max", 10); - this.thresholdIdleConnections = conf.getInt("hbase.ipc.client.idlethreshold", 4000); this.purgeTimeout = conf.getLong("hbase.ipc.client.call.purge.timeout", 2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT); this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME); @@ -2120,6 +2038,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { // Create the responder here responder = new Responder(); + connectionManager = new ConnectionManager(); this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false); this.userProvider = UserProvider.instantiate(conf); this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled(); @@ -2177,12 +2096,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } protected void closeConnection(Connection connection) { - synchronized (connectionList) { - if (connectionList.remove(connection)) { - numConnections--; - } - } - connection.close(); + connectionManager.close(connection); } Configuration getConf() { @@ -2440,7 +2354,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { @Override public void addCallSize(final long diff) { - this.callQueueSize.add(diff); + this.callQueueSizeInBytes.add(diff); } /** @@ -2577,6 +2491,14 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { return ctx == null? null: ctx.getRequestUser(); } + /** + * The number of open RPC conections + * @return the number of open rpc connections + */ + public int getNumOpenConnections() { + return connectionManager.size(); + } + /** * Returns the username for any user associated with the current RPC * request or null if no user is set. @@ -2695,4 +2617,149 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { public RpcScheduler getScheduler() { return scheduler; } + + private class ConnectionManager { + final private AtomicInteger count = new AtomicInteger(); + final private Set connections; + + final private Timer idleScanTimer; + final private int idleScanThreshold; + final private int idleScanInterval; + final private int maxIdleTime; + final private int maxIdleToClose; + + ConnectionManager() { + this.idleScanTimer = new Timer("RpcServer idle connection scanner for port " + port, true); + this.idleScanThreshold = conf.getInt("hbase.ipc.client.idlethreshold", 4000); + this.idleScanInterval = + conf.getInt("hbase.ipc.client.connection.idle-scan-interval.ms", 10000); + this.maxIdleTime = 2 * conf.getInt("hbase.ipc.client.connection.maxidletime", 10000); + this.maxIdleToClose = conf.getInt("hbase.ipc.client.kill.max", 10); + int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, + HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT); + int maxConnectionQueueSize = + handlerCount * conf.getInt("hbase.ipc.server.handler.queue.size", 100); + // create a set with concurrency -and- a thread-safe iterator, add 2 + // for listener and idle closer threads + this.connections = Collections.newSetFromMap( + new ConcurrentHashMap( + maxConnectionQueueSize, 0.75f, readThreads+2)); + } + + private boolean add(Connection connection) { + boolean added = connections.add(connection); + if (added) { + count.getAndIncrement(); + } + return added; + } + + private boolean remove(Connection connection) { + boolean removed = connections.remove(connection); + if (removed) { + count.getAndDecrement(); + } + return removed; + } + + int size() { + return count.get(); + } + + Connection[] toArray() { + return connections.toArray(new Connection[0]); + } + + Connection register(SocketChannel channel) { + Connection connection = new Connection(channel, System.currentTimeMillis()); + add(connection); + if (LOG.isDebugEnabled()) { + LOG.debug("Server connection from " + connection + + "; connections=" + size() + + ", queued calls size (bytes)=" + callQueueSizeInBytes.get() + + ", general queued calls=" + scheduler.getGeneralQueueLength() + + ", priority queued calls=" + scheduler.getPriorityQueueLength()); + } + return connection; + } + + boolean close(Connection connection) { + boolean exists = remove(connection); + if (exists) { + if (LOG.isDebugEnabled()) { + LOG.debug(Thread.currentThread().getName() + + ": disconnecting client " + connection + + ". Number of active connections: "+ size()); + } + // only close if actually removed to avoid double-closing due + // to possible races + connection.close(); + } + return exists; + } + + // synch'ed to avoid explicit invocation upon OOM from colliding with + // timer task firing + synchronized void closeIdle(boolean scanAll) { + long minLastContact = System.currentTimeMillis() - maxIdleTime; + // concurrent iterator might miss new connections added + // during the iteration, but that's ok because they won't + // be idle yet anyway and will be caught on next scan + int closed = 0; + for (Connection connection : connections) { + // stop if connections dropped below threshold unless scanning all + if (!scanAll && size() < idleScanThreshold) { + break; + } + // stop if not scanning all and max connections are closed + if (connection.isIdle() && + connection.getLastContact() < minLastContact && + close(connection) && + !scanAll && (++closed == maxIdleToClose)) { + break; + } + } + } + + void closeAll() { + // use a copy of the connections to be absolutely sure the concurrent + // iterator doesn't miss a connection + for (Connection connection : toArray()) { + close(connection); + } + } + + void startIdleScan() { + scheduleIdleScanTask(); + } + + void stopIdleScan() { + idleScanTimer.cancel(); + } + + private void scheduleIdleScanTask() { + if (!running) { + return; + } + TimerTask idleScanTask = new TimerTask(){ + @Override + public void run() { + if (!running) { + return; + } + if (LOG.isDebugEnabled()) { + LOG.debug(Thread.currentThread().getName()+": task running"); + } + try { + closeIdle(false); + } finally { + // explicitly reschedule so next execution occurs relative + // to the end of this scan, not the beginning + scheduleIdleScanTask(); + } + } + }; + idleScanTimer.schedule(idleScanTask, idleScanInterval); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java index 1f496b44c00..743c5bb89ad 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java @@ -41,7 +41,7 @@ public class SimpleRpcSchedulerFactory implements RpcSchedulerFactory { @Override public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) { int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, - HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT); + HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT); return new SimpleRpcScheduler( conf, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java index ceb945b9fcb..45cec789910 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java @@ -263,7 +263,7 @@ public abstract class AbstractTestIPC { fail("Expected an exception to have been thrown!"); } catch (Exception e) { LOG.info("Caught expected exception: " + e.toString()); - assertTrue(StringUtils.stringifyException(e).contains("Injected fault")); + assertTrue(e.toString(), StringUtils.stringifyException(e).contains("Injected fault")); } finally { rpcServer.stop(); }