HBASE-2941 port HADOOP-6713 - threading scalability for RPC reads - to HBase
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@997512 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
eecc426f78
commit
5581a249e8
|
@ -909,6 +909,7 @@ Release 0.21.0 - Unreleased
|
||||||
HBASE-2977 Refactor master command line to a new class
|
HBASE-2977 Refactor master command line to a new class
|
||||||
HBASE-2980 Refactor region server command line to a new class
|
HBASE-2980 Refactor region server command line to a new class
|
||||||
HBASE-2988 Support alternate compression for major compactions
|
HBASE-2988 Support alternate compression for major compactions
|
||||||
|
HBASE-2941 port HADOOP-6713 - threading scalability for RPC reads - to HBase
|
||||||
|
|
||||||
NEW FEATURES
|
NEW FEATURES
|
||||||
HBASE-1961 HBase EC2 scripts
|
HBASE-1961 HBase EC2 scripts
|
||||||
|
|
|
@ -58,6 +58,8 @@ import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
|
||||||
/** An abstract IPC service. IPC calls take a single {@link Writable} as a
|
/** An abstract IPC service. IPC calls take a single {@link Writable} as a
|
||||||
|
@ -129,6 +131,7 @@ public abstract class HBaseServer {
|
||||||
protected String bindAddress;
|
protected String bindAddress;
|
||||||
protected int port; // port we listen on
|
protected int port; // port we listen on
|
||||||
private int handlerCount; // number of handler threads
|
private int handlerCount; // number of handler threads
|
||||||
|
private int readThreads; // number of read threads
|
||||||
protected Class<? extends Writable> paramClass; // class of call parameters
|
protected Class<? extends Writable> paramClass; // class of call parameters
|
||||||
protected int maxIdleTime; // the maximum idle time after
|
protected int maxIdleTime; // the maximum idle time after
|
||||||
// which a client may be
|
// which a client may be
|
||||||
|
@ -227,6 +230,8 @@ public abstract class HBaseServer {
|
||||||
|
|
||||||
private ServerSocketChannel acceptChannel = null; //the accept channel
|
private ServerSocketChannel acceptChannel = null; //the accept channel
|
||||||
private Selector selector = null; //the selector that we use for the server
|
private Selector selector = null; //the selector that we use for the server
|
||||||
|
private Reader[] readers = null;
|
||||||
|
private int currentReader = 0;
|
||||||
private InetSocketAddress address; //the address we bind at
|
private InetSocketAddress address; //the address we bind at
|
||||||
private Random rand = new Random();
|
private Random rand = new Random();
|
||||||
private long lastCleanupRunTime = 0; //the last time when a cleanup connec-
|
private long lastCleanupRunTime = 0; //the last time when a cleanup connec-
|
||||||
|
@ -235,6 +240,8 @@ public abstract class HBaseServer {
|
||||||
//two cleanup runs
|
//two cleanup runs
|
||||||
private int backlogLength = conf.getInt("ipc.server.listen.queue.size", 128);
|
private int backlogLength = conf.getInt("ipc.server.listen.queue.size", 128);
|
||||||
|
|
||||||
|
private ExecutorService readPool;
|
||||||
|
|
||||||
public Listener() throws IOException {
|
public Listener() throws IOException {
|
||||||
address = new InetSocketAddress(bindAddress, port);
|
address = new InetSocketAddress(bindAddress, port);
|
||||||
// Create a new server socket and set to non blocking mode
|
// Create a new server socket and set to non blocking mode
|
||||||
|
@ -247,11 +254,86 @@ public abstract class HBaseServer {
|
||||||
// create a selector;
|
// create a selector;
|
||||||
selector= Selector.open();
|
selector= Selector.open();
|
||||||
|
|
||||||
|
readers = new Reader[readThreads];
|
||||||
|
readPool = Executors.newFixedThreadPool(readThreads);
|
||||||
|
for (int i = 0; i < readThreads; ++i) {
|
||||||
|
Selector readSelector = Selector.open();
|
||||||
|
Reader reader = new Reader(readSelector);
|
||||||
|
readers[i] = reader;
|
||||||
|
readPool.execute(reader);
|
||||||
|
}
|
||||||
|
|
||||||
// Register accepts on the server socket with the selector.
|
// Register accepts on the server socket with the selector.
|
||||||
acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
|
acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
|
||||||
this.setName("IPC Server listener on " + port);
|
this.setName("IPC Server listener on " + port);
|
||||||
this.setDaemon(true);
|
this.setDaemon(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private class Reader implements Runnable {
|
||||||
|
private volatile boolean adding = false;
|
||||||
|
private Selector readSelector = null;
|
||||||
|
|
||||||
|
Reader(Selector readSelector) {
|
||||||
|
this.readSelector = readSelector;
|
||||||
|
}
|
||||||
|
public void run() {
|
||||||
|
LOG.info("Starting SocketReader");
|
||||||
|
synchronized(this) {
|
||||||
|
while (running) {
|
||||||
|
SelectionKey key = null;
|
||||||
|
try {
|
||||||
|
readSelector.select();
|
||||||
|
while (adding) {
|
||||||
|
this.wait(1000);
|
||||||
|
}
|
||||||
|
|
||||||
|
Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
|
||||||
|
while (iter.hasNext()) {
|
||||||
|
key = iter.next();
|
||||||
|
iter.remove();
|
||||||
|
if (key.isValid()) {
|
||||||
|
if (key.isReadable()) {
|
||||||
|
doRead(key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
key = null;
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
if (running) { // unexpected -- log it
|
||||||
|
LOG.info(getName() + "caught: " +
|
||||||
|
StringUtils.stringifyException(e));
|
||||||
|
}
|
||||||
|
} catch (IOException ex) {
|
||||||
|
LOG.error("Error in Reader", ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This gets reader into the state that waits for the new channel
|
||||||
|
* to be registered with readSelector. If it was waiting in select()
|
||||||
|
* the thread will be woken up, otherwise whenever select() is called
|
||||||
|
* it will return even if there is nothing to read and wait
|
||||||
|
* in while(adding) for finishAdd call
|
||||||
|
*/
|
||||||
|
public void startAdd() {
|
||||||
|
adding = true;
|
||||||
|
readSelector.wakeup();
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized SelectionKey registerChannel(SocketChannel channel)
|
||||||
|
throws IOException {
|
||||||
|
return channel.register(readSelector, SelectionKey.OP_READ);
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized void finishAdd() {
|
||||||
|
adding = false;
|
||||||
|
this.notify();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/** cleanup connections from connectionList. Choose a random range
|
/** cleanup connections from connectionList. Choose a random range
|
||||||
* to scan and also have a limit on the number of the connections
|
* to scan and also have a limit on the number of the connections
|
||||||
* that will be cleanedup per run. The criteria for cleanup is the time
|
* that will be cleanedup per run. The criteria for cleanup is the time
|
||||||
|
@ -319,8 +401,6 @@ public abstract class HBaseServer {
|
||||||
if (key.isValid()) {
|
if (key.isValid()) {
|
||||||
if (key.isAcceptable())
|
if (key.isAcceptable())
|
||||||
doAccept(key);
|
doAccept(key);
|
||||||
else if (key.isReadable())
|
|
||||||
doRead(key);
|
|
||||||
}
|
}
|
||||||
} catch (IOException ignored) {
|
} catch (IOException ignored) {
|
||||||
}
|
}
|
||||||
|
@ -343,11 +423,6 @@ public abstract class HBaseServer {
|
||||||
cleanupConnections(true);
|
cleanupConnections(true);
|
||||||
try { Thread.sleep(60000); } catch (Exception ignored) {}
|
try { Thread.sleep(60000); } catch (Exception ignored) {}
|
||||||
}
|
}
|
||||||
} catch (InterruptedException e) {
|
|
||||||
if (running) { // unexpected -- log it
|
|
||||||
LOG.info(getName() + " caught: " +
|
|
||||||
StringUtils.stringifyException(e));
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
closeCurrentConnection(key);
|
closeCurrentConnection(key);
|
||||||
}
|
}
|
||||||
|
@ -389,25 +464,30 @@ public abstract class HBaseServer {
|
||||||
void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
|
void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
|
||||||
Connection c;
|
Connection c;
|
||||||
ServerSocketChannel server = (ServerSocketChannel) key.channel();
|
ServerSocketChannel server = (ServerSocketChannel) key.channel();
|
||||||
// accept up to 10 connections
|
|
||||||
for (int i=0; i<10; i++) {
|
|
||||||
SocketChannel channel = server.accept();
|
|
||||||
if (channel==null) return;
|
|
||||||
|
|
||||||
|
SocketChannel channel;
|
||||||
|
while ((channel = server.accept()) != null) {
|
||||||
channel.configureBlocking(false);
|
channel.configureBlocking(false);
|
||||||
channel.socket().setTcpNoDelay(tcpNoDelay);
|
channel.socket().setTcpNoDelay(tcpNoDelay);
|
||||||
channel.socket().setKeepAlive(tcpKeepAlive);
|
channel.socket().setKeepAlive(tcpKeepAlive);
|
||||||
SelectionKey readKey = channel.register(selector, SelectionKey.OP_READ);
|
|
||||||
c = new Connection(channel, System.currentTimeMillis());
|
Reader reader = getReader();
|
||||||
readKey.attach(c);
|
try {
|
||||||
synchronized (connectionList) {
|
reader.startAdd();
|
||||||
connectionList.add(numConnections, c);
|
SelectionKey readKey = reader.registerChannel(channel);
|
||||||
numConnections++;
|
c = new Connection(channel, System.currentTimeMillis());
|
||||||
|
readKey.attach(c);
|
||||||
|
synchronized (connectionList) {
|
||||||
|
connectionList.add(numConnections, c);
|
||||||
|
numConnections++;
|
||||||
|
}
|
||||||
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("Server connection from " + c.toString() +
|
||||||
|
"; # active connections: " + numConnections +
|
||||||
|
"; # queued calls: " + callQueue.size());
|
||||||
|
} finally {
|
||||||
|
reader.finishAdd();
|
||||||
}
|
}
|
||||||
if (LOG.isDebugEnabled())
|
|
||||||
LOG.debug("Server connection from " + c.toString() +
|
|
||||||
"; # active connections: " + numConnections +
|
|
||||||
"; # queued calls: " + callQueue.size());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -452,6 +532,14 @@ public abstract class HBaseServer {
|
||||||
LOG.info(getName() + ":Exception in closing listener socket. " + e);
|
LOG.info(getName() + ":Exception in closing listener socket. " + e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
readPool.shutdownNow();
|
||||||
|
}
|
||||||
|
|
||||||
|
// The method that will return the next reader to work with
|
||||||
|
// Simplistic implementation of round robin for now
|
||||||
|
Reader getReader() {
|
||||||
|
currentReader = (currentReader + 1) % readers.length;
|
||||||
|
return readers[currentReader];
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -993,6 +1081,9 @@ public abstract class HBaseServer {
|
||||||
this.handlerCount = handlerCount;
|
this.handlerCount = handlerCount;
|
||||||
this.socketSendBufferSize = 0;
|
this.socketSendBufferSize = 0;
|
||||||
this.maxQueueSize = handlerCount * MAX_QUEUE_SIZE_PER_HANDLER;
|
this.maxQueueSize = handlerCount * MAX_QUEUE_SIZE_PER_HANDLER;
|
||||||
|
this.readThreads = conf.getInt(
|
||||||
|
"ipc.server.read.threadpool.size",
|
||||||
|
10);
|
||||||
this.callQueue = new LinkedBlockingQueue<Call>(maxQueueSize);
|
this.callQueue = new LinkedBlockingQueue<Call>(maxQueueSize);
|
||||||
this.maxIdleTime = 2*conf.getInt("ipc.client.connection.maxidletime", 1000);
|
this.maxIdleTime = 2*conf.getInt("ipc.client.connection.maxidletime", 1000);
|
||||||
this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
|
this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
|
||||||
|
|
Loading…
Reference in New Issue