HBASE-15948 Port "HADOOP-9956 RPC listener inefficiently assigns connections to readers"
Adds HADOOP-9955 RPC idle connection closing is extremely inefficient Then removes queue added by HADOOP-9956 at Enis suggestion Changes how we do accounting of Connections to match how it is done in Hadoop. Adds a ConnectionManager class. Adds new configurations for this new class. "hbase.ipc.client.idlethreshold" 4000 "hbase.ipc.client.connection.idle-scan-interval.ms" 10000 "hbase.ipc.client.connection.maxidletime" 10000 "hbase.ipc.client.kill.max", 10 "hbase.ipc.server.handler.queue.size", 100 The new scheme does away with synchronization that purportedly would freeze out reads while we were cleaning up stale connections (according to HADOOP-9955) Also adds in new mechanism for accepting Connections by pulling in as many as we can at a time adding them to a Queue instead of doing one at a time. Can help when bursty traffic according to HADOOP-9956. Removes a blocking while Reader is busy parsing a request. Adds configuration "hbase.ipc.server.read.connection-queue.size" with default of 100 for queue size.
This commit is contained in:
parent
da88b48240
commit
e0b70c00e7
|
@ -52,14 +52,16 @@ public interface MetricsHBaseServerSource extends BaseSource {
|
||||||
String TOTAL_CALL_TIME_NAME = "totalCallTime";
|
String TOTAL_CALL_TIME_NAME = "totalCallTime";
|
||||||
String TOTAL_CALL_TIME_DESC = "Total call time, including both queued and processing time.";
|
String TOTAL_CALL_TIME_DESC = "Total call time, including both queued and processing time.";
|
||||||
String QUEUE_SIZE_NAME = "queueSize";
|
String QUEUE_SIZE_NAME = "queueSize";
|
||||||
String QUEUE_SIZE_DESC = "Number of bytes in the call queues.";
|
String QUEUE_SIZE_DESC = "Number of bytes in the call queues; request has been read and " +
|
||||||
|
"parsed and is waiting to run or is currently being executed.";
|
||||||
String GENERAL_QUEUE_NAME = "numCallsInGeneralQueue";
|
String GENERAL_QUEUE_NAME = "numCallsInGeneralQueue";
|
||||||
String GENERAL_QUEUE_DESC = "Number of calls in the general call queue.";
|
String GENERAL_QUEUE_DESC = "Number of calls in the general call queue; " +
|
||||||
|
"parsed requests waiting in scheduler to be executed";
|
||||||
String PRIORITY_QUEUE_NAME = "numCallsInPriorityQueue";
|
String PRIORITY_QUEUE_NAME = "numCallsInPriorityQueue";
|
||||||
String REPLICATION_QUEUE_NAME = "numCallsInReplicationQueue";
|
String REPLICATION_QUEUE_NAME = "numCallsInReplicationQueue";
|
||||||
String REPLICATION_QUEUE_DESC =
|
String REPLICATION_QUEUE_DESC =
|
||||||
"Number of calls in the replication call queue.";
|
"Number of calls in the replication call queue waiting to be run";
|
||||||
String PRIORITY_QUEUE_DESC = "Number of calls in the priority call queue.";
|
String PRIORITY_QUEUE_DESC = "Number of calls in the priority call queue waiting to be run";
|
||||||
String NUM_OPEN_CONNECTIONS_NAME = "numOpenConnections";
|
String NUM_OPEN_CONNECTIONS_NAME = "numOpenConnections";
|
||||||
String NUM_OPEN_CONNECTIONS_DESC = "Number of open connections.";
|
String NUM_OPEN_CONNECTIONS_DESC = "Number of open connections.";
|
||||||
String NUM_ACTIVE_HANDLER_NAME = "numActiveHandler";
|
String NUM_ACTIVE_HANDLER_NAME = "numActiveHandler";
|
||||||
|
|
|
@ -36,7 +36,7 @@ public class MetricsHBaseServerWrapperImpl implements MetricsHBaseServerWrapper
|
||||||
if (!isServerStarted()) {
|
if (!isServerStarted()) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
return server.callQueueSize.get();
|
return server.callQueueSizeInBytes.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -65,10 +65,10 @@ public class MetricsHBaseServerWrapperImpl implements MetricsHBaseServerWrapper
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getNumOpenConnections() {
|
public int getNumOpenConnections() {
|
||||||
if (!isServerStarted() || this.server.connectionList == null) {
|
if (!isServerStarted()) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
return server.connectionList.size();
|
return server.getNumOpenConnections();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -48,15 +48,16 @@ import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.LinkedList;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Random;
|
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.Timer;
|
||||||
|
import java.util.TimerTask;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentLinkedDeque;
|
import java.util.concurrent.ConcurrentLinkedDeque;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.locks.Lock;
|
import java.util.concurrent.locks.Lock;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
|
||||||
|
@ -113,6 +114,7 @@ import org.apache.hadoop.hbase.util.Counter;
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hadoop.io.BytesWritable;
|
import org.apache.hadoop.io.BytesWritable;
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.io.IntWritable;
|
import org.apache.hadoop.io.IntWritable;
|
||||||
import org.apache.hadoop.io.Writable;
|
import org.apache.hadoop.io.Writable;
|
||||||
import org.apache.hadoop.io.WritableUtils;
|
import org.apache.hadoop.io.WritableUtils;
|
||||||
|
@ -183,11 +185,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
||||||
*/
|
*/
|
||||||
static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10;
|
static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10;
|
||||||
|
|
||||||
/**
|
|
||||||
* The maximum size that we can hold in the RPC queue
|
|
||||||
*/
|
|
||||||
private static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024;
|
|
||||||
|
|
||||||
private final IPCUtil ipcUtil;
|
private final IPCUtil ipcUtil;
|
||||||
|
|
||||||
private static final String AUTH_FAILED_FOR = "Auth failed for ";
|
private static final String AUTH_FAILED_FOR = "Auth failed for ";
|
||||||
|
@ -210,22 +207,30 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
||||||
protected int port; // port we listen on
|
protected int port; // port we listen on
|
||||||
protected InetSocketAddress address; // inet address we listen on
|
protected InetSocketAddress address; // inet address we listen on
|
||||||
private int readThreads; // number of read threads
|
private int readThreads; // number of read threads
|
||||||
protected int maxIdleTime; // the maximum idle time after
|
|
||||||
// which a client may be
|
|
||||||
// disconnected
|
|
||||||
protected int thresholdIdleConnections; // the number of idle
|
|
||||||
// connections after which we
|
|
||||||
// will start cleaning up idle
|
|
||||||
// connections
|
|
||||||
int maxConnectionsToNuke; // the max number of
|
|
||||||
// connections to nuke
|
|
||||||
// during a cleanup
|
|
||||||
|
|
||||||
protected MetricsHBaseServer metrics;
|
protected MetricsHBaseServer metrics;
|
||||||
|
|
||||||
protected final Configuration conf;
|
protected final Configuration conf;
|
||||||
|
|
||||||
private int maxQueueSize;
|
/**
|
||||||
|
* Maximum size in bytes of the currently queued and running Calls. If a new Call puts us over
|
||||||
|
* this size, then we will reject the call (after parsing it though). It will go back to the
|
||||||
|
* client and client will retry. Set this size with "hbase.ipc.server.max.callqueue.size". The
|
||||||
|
* call queue size gets incremented after we parse a call and before we add it to the queue of
|
||||||
|
* calls for the scheduler to use. It get decremented after we have 'run' the Call. The current
|
||||||
|
* size is kept in {@link #callQueueSizeInBytes}.
|
||||||
|
* @see {@link #callQueueSizeInBytes}
|
||||||
|
* @see {@link #DEFAULT_MAX_CALLQUEUE_SIZE}
|
||||||
|
* @see {@link #callQueueSizeInBytes}
|
||||||
|
*/
|
||||||
|
private final long maxQueueSizeInBytes;
|
||||||
|
private static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This is a running count of the size in bytes of all outstanding calls whether currently
|
||||||
|
* executing or queued waiting to be run.
|
||||||
|
*/
|
||||||
|
protected final Counter callQueueSizeInBytes = new Counter();
|
||||||
|
|
||||||
protected int socketSendBufferSize;
|
protected int socketSendBufferSize;
|
||||||
protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
|
protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
|
||||||
protected final boolean tcpKeepAlive; // if T then use keepalives
|
protected final boolean tcpKeepAlive; // if T then use keepalives
|
||||||
|
@ -244,19 +249,11 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
||||||
*/
|
*/
|
||||||
volatile boolean started = false;
|
volatile boolean started = false;
|
||||||
|
|
||||||
/**
|
// maintains the set of client connections and handles idle timeouts
|
||||||
* This is a running count of the size of all outstanding calls by size.
|
private ConnectionManager connectionManager;
|
||||||
*/
|
|
||||||
protected final Counter callQueueSize = new Counter();
|
|
||||||
|
|
||||||
protected final List<Connection> connectionList =
|
|
||||||
Collections.synchronizedList(new LinkedList<Connection>());
|
|
||||||
//maintain a list
|
|
||||||
//of client connections
|
|
||||||
private Listener listener = null;
|
private Listener listener = null;
|
||||||
protected Responder responder = null;
|
protected Responder responder = null;
|
||||||
protected AuthenticationTokenSecretManager authTokenSecretMgr = null;
|
protected AuthenticationTokenSecretManager authTokenSecretMgr = null;
|
||||||
protected int numConnections = 0;
|
|
||||||
|
|
||||||
protected HBaseRPCErrorHandler errorHandler = null;
|
protected HBaseRPCErrorHandler errorHandler = null;
|
||||||
|
|
||||||
|
@ -623,18 +620,13 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
||||||
private Selector selector = null; //the selector that we use for the server
|
private Selector selector = null; //the selector that we use for the server
|
||||||
private Reader[] readers = null;
|
private Reader[] readers = null;
|
||||||
private int currentReader = 0;
|
private int currentReader = 0;
|
||||||
private Random rand = new Random();
|
|
||||||
private long lastCleanupRunTime = 0; //the last time when a cleanup connec-
|
|
||||||
//-tion (for idle connections) ran
|
|
||||||
private long cleanupInterval = 10000; //the minimum interval between
|
|
||||||
//two cleanup runs
|
|
||||||
private int backlogLength;
|
|
||||||
|
|
||||||
private ExecutorService readPool;
|
private ExecutorService readPool;
|
||||||
|
|
||||||
public Listener(final String name) throws IOException {
|
public Listener(final String name) throws IOException {
|
||||||
super(name);
|
super(name);
|
||||||
backlogLength = conf.getInt("hbase.ipc.server.listen.queue.size", 128);
|
// The backlog of requests that we will have the serversocket carry.
|
||||||
|
int backlogLength = conf.getInt("hbase.ipc.server.listen.queue.size", 128);
|
||||||
// Create a new server socket and set to non blocking mode
|
// Create a new server socket and set to non blocking mode
|
||||||
acceptChannel = ServerSocketChannel.open();
|
acceptChannel = ServerSocketChannel.open();
|
||||||
acceptChannel.configureBlocking(false);
|
acceptChannel.configureBlocking(false);
|
||||||
|
@ -647,6 +639,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
||||||
selector = Selector.open();
|
selector = Selector.open();
|
||||||
|
|
||||||
readers = new Reader[readThreads];
|
readers = new Reader[readThreads];
|
||||||
|
// Why this executor thing? Why not like hadoop just start up all the threads? I suppose it
|
||||||
|
// has an advantage in that it is easy to shutdown the pool.
|
||||||
readPool = Executors.newFixedThreadPool(readThreads,
|
readPool = Executors.newFixedThreadPool(readThreads,
|
||||||
new ThreadFactoryBuilder().setNameFormat(
|
new ThreadFactoryBuilder().setNameFormat(
|
||||||
"RpcServer.reader=%d,bindAddress=" + bindAddress.getHostName() +
|
"RpcServer.reader=%d,bindAddress=" + bindAddress.getHostName() +
|
||||||
|
@ -667,12 +661,12 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
||||||
|
|
||||||
|
|
||||||
private class Reader implements Runnable {
|
private class Reader implements Runnable {
|
||||||
private volatile boolean adding = false;
|
|
||||||
private final Selector readSelector;
|
private final Selector readSelector;
|
||||||
|
|
||||||
Reader() throws IOException {
|
Reader() throws IOException {
|
||||||
this.readSelector = Selector.open();
|
this.readSelector = Selector.open();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
|
@ -686,14 +680,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized void doRunLoop() {
|
private void doRunLoop() {
|
||||||
while (running) {
|
while (running) {
|
||||||
try {
|
try {
|
||||||
readSelector.select();
|
readSelector.select();
|
||||||
while (adding) {
|
|
||||||
this.wait(1000);
|
|
||||||
}
|
|
||||||
|
|
||||||
Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
|
Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
|
||||||
while (iter.hasNext()) {
|
while (iter.hasNext()) {
|
||||||
SelectionKey key = iter.next();
|
SelectionKey key = iter.next();
|
||||||
|
@ -703,9 +693,12 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
||||||
doRead(key);
|
doRead(key);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
key = null;
|
||||||
}
|
}
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
LOG.debug("Interrupted while sleeping");
|
if (running) { // unexpected -- log it
|
||||||
|
LOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e);
|
||||||
|
}
|
||||||
return;
|
return;
|
||||||
} catch (IOException ex) {
|
} catch (IOException ex) {
|
||||||
LOG.info(getName() + ": IOException in Reader", ex);
|
LOG.info(getName() + ": IOException in Reader", ex);
|
||||||
|
@ -714,76 +707,14 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This gets reader into the state that waits for the new channel
|
* Updating the readSelector while it's being used is not thread-safe,
|
||||||
* to be registered with readSelector. If it was waiting in select()
|
* so the connection must be queued. The reader will drain the queue
|
||||||
* the thread will be woken up, otherwise whenever select() is called
|
* and update its readSelector before performing the next select
|
||||||
* it will return even if there is nothing to read and wait
|
|
||||||
* in while(adding) for finishAdd call
|
|
||||||
*/
|
*/
|
||||||
public void startAdd() {
|
public void addConnection(Connection conn) throws IOException {
|
||||||
adding = true;
|
conn.channel.register(readSelector, SelectionKey.OP_READ, conn);
|
||||||
readSelector.wakeup();
|
readSelector.wakeup();
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized SelectionKey registerChannel(SocketChannel channel)
|
|
||||||
throws IOException {
|
|
||||||
return channel.register(readSelector, SelectionKey.OP_READ);
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized void finishAdd() {
|
|
||||||
adding = false;
|
|
||||||
this.notify();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/** cleanup connections from connectionList. Choose a random range
|
|
||||||
* to scan and also have a limit on the number of the connections
|
|
||||||
* that will be cleanedup per run. The criteria for cleanup is the time
|
|
||||||
* for which the connection was idle. If 'force' is true then all
|
|
||||||
* connections will be looked at for the cleanup.
|
|
||||||
* @param force all connections will be looked at for cleanup
|
|
||||||
*/
|
|
||||||
private void cleanupConnections(boolean force) {
|
|
||||||
if (force || numConnections > thresholdIdleConnections) {
|
|
||||||
long currentTime = System.currentTimeMillis();
|
|
||||||
if (!force && (currentTime - lastCleanupRunTime) < cleanupInterval) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
int start = 0;
|
|
||||||
int end = numConnections - 1;
|
|
||||||
if (!force) {
|
|
||||||
start = rand.nextInt() % numConnections;
|
|
||||||
end = rand.nextInt() % numConnections;
|
|
||||||
int temp;
|
|
||||||
if (end < start) {
|
|
||||||
temp = start;
|
|
||||||
start = end;
|
|
||||||
end = temp;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
int i = start;
|
|
||||||
int numNuked = 0;
|
|
||||||
while (i <= end) {
|
|
||||||
Connection c;
|
|
||||||
synchronized (connectionList) {
|
|
||||||
try {
|
|
||||||
c = connectionList.get(i);
|
|
||||||
} catch (Exception e) {return;}
|
|
||||||
}
|
|
||||||
if (c.timedOut(currentTime)) {
|
|
||||||
if (LOG.isDebugEnabled())
|
|
||||||
LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
|
|
||||||
closeConnection(c);
|
|
||||||
numNuked++;
|
|
||||||
end--;
|
|
||||||
//noinspection UnusedAssignment
|
|
||||||
c = null;
|
|
||||||
if (!force && numNuked == maxConnectionsToNuke) break;
|
|
||||||
}
|
|
||||||
else i++;
|
|
||||||
}
|
|
||||||
lastCleanupRunTime = System.currentTimeMillis();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -792,6 +723,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
||||||
"it will have per impact")
|
"it will have per impact")
|
||||||
public void run() {
|
public void run() {
|
||||||
LOG.info(getName() + ": starting");
|
LOG.info(getName() + ": starting");
|
||||||
|
connectionManager.startIdleScan();
|
||||||
while (running) {
|
while (running) {
|
||||||
SelectionKey key = null;
|
SelectionKey key = null;
|
||||||
try {
|
try {
|
||||||
|
@ -815,7 +747,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
||||||
if (errorHandler.checkOOME(e)) {
|
if (errorHandler.checkOOME(e)) {
|
||||||
LOG.info(getName() + ": exiting on OutOfMemoryError");
|
LOG.info(getName() + ": exiting on OutOfMemoryError");
|
||||||
closeCurrentConnection(key, e);
|
closeCurrentConnection(key, e);
|
||||||
cleanupConnections(true);
|
connectionManager.closeIdle(true);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -824,22 +756,18 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
||||||
// some thread(s) a chance to finish
|
// some thread(s) a chance to finish
|
||||||
LOG.warn(getName() + ": OutOfMemoryError in server select", e);
|
LOG.warn(getName() + ": OutOfMemoryError in server select", e);
|
||||||
closeCurrentConnection(key, e);
|
closeCurrentConnection(key, e);
|
||||||
cleanupConnections(true);
|
connectionManager.closeIdle(true);
|
||||||
try {
|
try {
|
||||||
Thread.sleep(60000);
|
Thread.sleep(60000);
|
||||||
} catch (InterruptedException ex) {
|
} catch (InterruptedException ex) {
|
||||||
LOG.debug("Interrupted while sleeping");
|
LOG.debug("Interrupted while sleeping");
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
closeCurrentConnection(key, e);
|
closeCurrentConnection(key, e);
|
||||||
}
|
}
|
||||||
cleanupConnections(false);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.info(getName() + ": stopping");
|
LOG.info(getName() + ": stopping");
|
||||||
|
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
try {
|
try {
|
||||||
acceptChannel.close();
|
acceptChannel.close();
|
||||||
|
@ -851,10 +779,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
||||||
selector= null;
|
selector= null;
|
||||||
acceptChannel= null;
|
acceptChannel= null;
|
||||||
|
|
||||||
// clean up all connections
|
// close all connections
|
||||||
while (!connectionList.isEmpty()) {
|
connectionManager.stopIdleScan();
|
||||||
closeConnection(connectionList.remove(0));
|
connectionManager.closeAll();
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -862,10 +789,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
||||||
if (key != null) {
|
if (key != null) {
|
||||||
Connection c = (Connection)key.attachment();
|
Connection c = (Connection)key.attachment();
|
||||||
if (c != null) {
|
if (c != null) {
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug(getName() + ": disconnecting client " + c.getHostAddress() +
|
|
||||||
(e != null ? " on error " + e.getMessage() : ""));
|
|
||||||
}
|
|
||||||
closeConnection(c);
|
closeConnection(c);
|
||||||
key.attach(null);
|
key.attach(null);
|
||||||
}
|
}
|
||||||
|
@ -876,37 +799,24 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
||||||
return address;
|
return address;
|
||||||
}
|
}
|
||||||
|
|
||||||
void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
|
void doAccept(SelectionKey key) throws InterruptedException, IOException, OutOfMemoryError {
|
||||||
Connection c;
|
|
||||||
ServerSocketChannel server = (ServerSocketChannel) key.channel();
|
ServerSocketChannel server = (ServerSocketChannel) key.channel();
|
||||||
|
|
||||||
SocketChannel channel;
|
SocketChannel channel;
|
||||||
while ((channel = server.accept()) != null) {
|
while ((channel = server.accept()) != null) {
|
||||||
try {
|
|
||||||
channel.configureBlocking(false);
|
channel.configureBlocking(false);
|
||||||
channel.socket().setTcpNoDelay(tcpNoDelay);
|
channel.socket().setTcpNoDelay(tcpNoDelay);
|
||||||
channel.socket().setKeepAlive(tcpKeepAlive);
|
channel.socket().setKeepAlive(tcpKeepAlive);
|
||||||
} catch (IOException ioe) {
|
|
||||||
channel.close();
|
|
||||||
throw ioe;
|
|
||||||
}
|
|
||||||
|
|
||||||
Reader reader = getReader();
|
Reader reader = getReader();
|
||||||
try {
|
Connection c = connectionManager.register(channel);
|
||||||
reader.startAdd();
|
// If the connectionManager can't take it, close the connection.
|
||||||
SelectionKey readKey = reader.registerChannel(channel);
|
if (c == null) {
|
||||||
c = getConnection(channel, System.currentTimeMillis());
|
if (channel.isOpen()) {
|
||||||
readKey.attach(c);
|
IOUtils.cleanup(null, channel);
|
||||||
synchronized (connectionList) {
|
|
||||||
connectionList.add(numConnections, c);
|
|
||||||
numConnections++;
|
|
||||||
}
|
}
|
||||||
if (LOG.isDebugEnabled())
|
continue;
|
||||||
LOG.debug(getName() + ": connection from " + c.toString() +
|
|
||||||
"; # active connections: " + numConnections);
|
|
||||||
} finally {
|
|
||||||
reader.finishAdd();
|
|
||||||
}
|
}
|
||||||
|
key.attach(c); // so closeCurrentConnection can get the object
|
||||||
|
reader.addConnection(c);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -919,12 +829,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
||||||
c.setLastContact(System.currentTimeMillis());
|
c.setLastContact(System.currentTimeMillis());
|
||||||
try {
|
try {
|
||||||
count = c.readAndProcess();
|
count = c.readAndProcess();
|
||||||
|
|
||||||
if (count > 0) {
|
|
||||||
c.setLastContact(System.currentTimeMillis());
|
|
||||||
}
|
|
||||||
|
|
||||||
} catch (InterruptedException ieo) {
|
} catch (InterruptedException ieo) {
|
||||||
|
LOG.info(Thread.currentThread().getName() + ": readAndProcess caught InterruptedException", ieo);
|
||||||
throw ieo;
|
throw ieo;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
|
@ -933,12 +839,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
||||||
count = -1; //so that the (count < 0) block is executed
|
count = -1; //so that the (count < 0) block is executed
|
||||||
}
|
}
|
||||||
if (count < 0) {
|
if (count < 0) {
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug(getName() + ": DISCONNECTING client " + c.toString() +
|
|
||||||
" because read count=" + count +
|
|
||||||
". Number of active connections: " + numConnections);
|
|
||||||
}
|
|
||||||
closeConnection(c);
|
closeConnection(c);
|
||||||
|
c = null;
|
||||||
|
} else {
|
||||||
|
c.setLastContact(System.currentTimeMillis());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1355,6 +1259,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long getLastContact() {
|
||||||
|
return lastContact;
|
||||||
|
}
|
||||||
|
|
||||||
/* Return true if the connection has no outstanding rpc */
|
/* Return true if the connection has no outstanding rpc */
|
||||||
private boolean isIdle() {
|
private boolean isIdle() {
|
||||||
return rpcCount.get() == 0;
|
return rpcCount.get() == 0;
|
||||||
|
@ -1370,10 +1278,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
||||||
rpcCount.increment();
|
rpcCount.increment();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected boolean timedOut(long currentTime) {
|
|
||||||
return isIdle() && currentTime - lastContact > maxIdleTime;
|
|
||||||
}
|
|
||||||
|
|
||||||
private UserGroupInformation getAuthorizedUgi(String authorizedId)
|
private UserGroupInformation getAuthorizedUgi(String authorizedId)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
UserGroupInformation authorizedUgi;
|
UserGroupInformation authorizedUgi;
|
||||||
|
@ -1883,7 +1787,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
||||||
}
|
}
|
||||||
// Enforcing the call queue size, this triggers a retry in the client
|
// Enforcing the call queue size, this triggers a retry in the client
|
||||||
// This is a bit late to be doing this check - we have already read in the total request.
|
// This is a bit late to be doing this check - we have already read in the total request.
|
||||||
if ((totalRequestSize + callQueueSize.get()) > maxQueueSize) {
|
if ((totalRequestSize + callQueueSizeInBytes.get()) > maxQueueSizeInBytes) {
|
||||||
final Call callTooBig =
|
final Call callTooBig =
|
||||||
new Call(id, this.service, null, null, null, null, this,
|
new Call(id, this.service, null, null, null, null, this,
|
||||||
responder, totalRequestSize, null, null, 0);
|
responder, totalRequestSize, null, null, 0);
|
||||||
|
@ -1954,7 +1858,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
||||||
totalRequestSize, traceInfo, this.addr, timeout);
|
totalRequestSize, traceInfo, this.addr, timeout);
|
||||||
|
|
||||||
if (!scheduler.dispatch(new CallRunner(RpcServer.this, call))) {
|
if (!scheduler.dispatch(new CallRunner(RpcServer.this, call))) {
|
||||||
callQueueSize.add(-1 * call.getSize());
|
callQueueSizeInBytes.add(-1 * call.getSize());
|
||||||
|
|
||||||
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
|
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
|
||||||
metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
|
metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
|
||||||
|
@ -2093,12 +1997,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
||||||
this.bindAddress = bindAddress;
|
this.bindAddress = bindAddress;
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
this.socketSendBufferSize = 0;
|
this.socketSendBufferSize = 0;
|
||||||
this.maxQueueSize =
|
// See declaration above for documentation on what this size is.
|
||||||
this.conf.getInt("hbase.ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE);
|
this.maxQueueSizeInBytes =
|
||||||
|
this.conf.getLong("hbase.ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE);
|
||||||
this.readThreads = conf.getInt("hbase.ipc.server.read.threadpool.size", 10);
|
this.readThreads = conf.getInt("hbase.ipc.server.read.threadpool.size", 10);
|
||||||
this.maxIdleTime = 2 * conf.getInt("hbase.ipc.client.connection.maxidletime", 1000);
|
|
||||||
this.maxConnectionsToNuke = conf.getInt("hbase.ipc.client.kill.max", 10);
|
|
||||||
this.thresholdIdleConnections = conf.getInt("hbase.ipc.client.idlethreshold", 4000);
|
|
||||||
this.purgeTimeout = conf.getLong("hbase.ipc.client.call.purge.timeout",
|
this.purgeTimeout = conf.getLong("hbase.ipc.client.call.purge.timeout",
|
||||||
2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
|
2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
|
||||||
this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME);
|
this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME);
|
||||||
|
@ -2120,6 +2022,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
||||||
|
|
||||||
// Create the responder here
|
// Create the responder here
|
||||||
responder = new Responder();
|
responder = new Responder();
|
||||||
|
connectionManager = new ConnectionManager();
|
||||||
this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);
|
this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);
|
||||||
this.userProvider = UserProvider.instantiate(conf);
|
this.userProvider = UserProvider.instantiate(conf);
|
||||||
this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled();
|
this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled();
|
||||||
|
@ -2177,12 +2080,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void closeConnection(Connection connection) {
|
protected void closeConnection(Connection connection) {
|
||||||
synchronized (connectionList) {
|
connectionManager.close(connection);
|
||||||
if (connectionList.remove(connection)) {
|
|
||||||
numConnections--;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
connection.close();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Configuration getConf() {
|
Configuration getConf() {
|
||||||
|
@ -2440,7 +2338,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void addCallSize(final long diff) {
|
public void addCallSize(final long diff) {
|
||||||
this.callQueueSize.add(diff);
|
this.callQueueSizeInBytes.add(diff);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -2577,6 +2475,14 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
||||||
return ctx == null? null: ctx.getRequestUser();
|
return ctx == null? null: ctx.getRequestUser();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The number of open RPC conections
|
||||||
|
* @return the number of open rpc connections
|
||||||
|
*/
|
||||||
|
public int getNumOpenConnections() {
|
||||||
|
return connectionManager.size();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the username for any user associated with the current RPC
|
* Returns the username for any user associated with the current RPC
|
||||||
* request or <code>null</code> if no user is set.
|
* request or <code>null</code> if no user is set.
|
||||||
|
@ -2695,4 +2601,150 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
||||||
public RpcScheduler getScheduler() {
|
public RpcScheduler getScheduler() {
|
||||||
return scheduler;
|
return scheduler;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private class ConnectionManager {
|
||||||
|
final private AtomicInteger count = new AtomicInteger();
|
||||||
|
final private Set<Connection> connections;
|
||||||
|
|
||||||
|
final private Timer idleScanTimer;
|
||||||
|
final private int idleScanThreshold;
|
||||||
|
final private int idleScanInterval;
|
||||||
|
final private int maxIdleTime;
|
||||||
|
final private int maxIdleToClose;
|
||||||
|
|
||||||
|
ConnectionManager() {
|
||||||
|
this.idleScanTimer = new Timer("RpcServer idle connection scanner for port " + port, true);
|
||||||
|
this.idleScanThreshold = conf.getInt("hbase.ipc.client.idlethreshold", 4000);
|
||||||
|
this.idleScanInterval =
|
||||||
|
conf.getInt("hbase.ipc.client.connection.idle-scan-interval.ms", 10000);
|
||||||
|
this.maxIdleTime = 2 * conf.getInt("hbase.ipc.client.connection.maxidletime", 10000);
|
||||||
|
this.maxIdleToClose = conf.getInt("hbase.ipc.client.kill.max", 10);
|
||||||
|
int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
|
||||||
|
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
|
||||||
|
int maxConnectionQueueSize =
|
||||||
|
handlerCount * conf.getInt("hbase.ipc.server.handler.queue.size", 100);
|
||||||
|
// create a set with concurrency -and- a thread-safe iterator, add 2
|
||||||
|
// for listener and idle closer threads
|
||||||
|
this.connections = Collections.newSetFromMap(
|
||||||
|
new ConcurrentHashMap<Connection,Boolean>(
|
||||||
|
maxConnectionQueueSize, 0.75f, readThreads+2));
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean add(Connection connection) {
|
||||||
|
boolean added = connections.add(connection);
|
||||||
|
if (added) {
|
||||||
|
count.getAndIncrement();
|
||||||
|
}
|
||||||
|
return added;
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean remove(Connection connection) {
|
||||||
|
boolean removed = connections.remove(connection);
|
||||||
|
if (removed) {
|
||||||
|
count.getAndDecrement();
|
||||||
|
}
|
||||||
|
return removed;
|
||||||
|
}
|
||||||
|
|
||||||
|
int size() {
|
||||||
|
return count.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
Connection[] toArray() {
|
||||||
|
return connections.toArray(new Connection[0]);
|
||||||
|
}
|
||||||
|
|
||||||
|
Connection register(SocketChannel channel) {
|
||||||
|
Connection connection = new Connection(channel, System.currentTimeMillis());
|
||||||
|
add(connection);
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
// Use metric names
|
||||||
|
LOG.debug("Server connection from " + connection +
|
||||||
|
"; numOpenConnections=" + size() +
|
||||||
|
", queueSize(bytes)=" + callQueueSizeInBytes.get() +
|
||||||
|
", numCallsInGeneralQueue=" + scheduler.getGeneralQueueLength() +
|
||||||
|
", numCallsInPriorityQueue=" + scheduler.getPriorityQueueLength());
|
||||||
|
}
|
||||||
|
return connection;
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean close(Connection connection) {
|
||||||
|
boolean exists = remove(connection);
|
||||||
|
if (exists) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug(Thread.currentThread().getName() +
|
||||||
|
": disconnecting client " + connection +
|
||||||
|
". Number of active connections: "+ size());
|
||||||
|
}
|
||||||
|
// only close if actually removed to avoid double-closing due
|
||||||
|
// to possible races
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
return exists;
|
||||||
|
}
|
||||||
|
|
||||||
|
// synch'ed to avoid explicit invocation upon OOM from colliding with
|
||||||
|
// timer task firing
|
||||||
|
synchronized void closeIdle(boolean scanAll) {
|
||||||
|
long minLastContact = System.currentTimeMillis() - maxIdleTime;
|
||||||
|
// concurrent iterator might miss new connections added
|
||||||
|
// during the iteration, but that's ok because they won't
|
||||||
|
// be idle yet anyway and will be caught on next scan
|
||||||
|
int closed = 0;
|
||||||
|
for (Connection connection : connections) {
|
||||||
|
// stop if connections dropped below threshold unless scanning all
|
||||||
|
if (!scanAll && size() < idleScanThreshold) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
// stop if not scanning all and max connections are closed
|
||||||
|
if (connection.isIdle() &&
|
||||||
|
connection.getLastContact() < minLastContact &&
|
||||||
|
close(connection) &&
|
||||||
|
!scanAll && (++closed == maxIdleToClose)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void closeAll() {
|
||||||
|
// use a copy of the connections to be absolutely sure the concurrent
|
||||||
|
// iterator doesn't miss a connection
|
||||||
|
for (Connection connection : toArray()) {
|
||||||
|
close(connection);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void startIdleScan() {
|
||||||
|
scheduleIdleScanTask();
|
||||||
|
}
|
||||||
|
|
||||||
|
void stopIdleScan() {
|
||||||
|
idleScanTimer.cancel();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void scheduleIdleScanTask() {
|
||||||
|
if (!running) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
TimerTask idleScanTask = new TimerTask(){
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
if (!running) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug(Thread.currentThread().getName()+": task running");
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
closeIdle(false);
|
||||||
|
} finally {
|
||||||
|
// explicitly reschedule so next execution occurs relative
|
||||||
|
// to the end of this scan, not the beginning
|
||||||
|
scheduleIdleScanTask();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
idleScanTimer.schedule(idleScanTask, idleScanInterval);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -263,7 +263,7 @@ public abstract class AbstractTestIPC {
|
||||||
fail("Expected an exception to have been thrown!");
|
fail("Expected an exception to have been thrown!");
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.info("Caught expected exception: " + e.toString());
|
LOG.info("Caught expected exception: " + e.toString());
|
||||||
assertTrue(StringUtils.stringifyException(e).contains("Injected fault"));
|
assertTrue(e.toString(), StringUtils.stringifyException(e).contains("Injected fault"));
|
||||||
} finally {
|
} finally {
|
||||||
rpcServer.stop();
|
rpcServer.stop();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue