svn merge -c 1542111 FIXES: HADOOP-9955. RPC idle connection closing is extremely inefficient (daryn)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1542112 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Daryn Sharp 2013-11-14 22:57:17 +00:00
parent 516e657ac7
commit 4495d087a7
4 changed files with 291 additions and 127 deletions

View File

@ -2048,6 +2048,8 @@ Release 0.23.10 - UNRELEASED
HADOOP-9956. RPC listener inefficiently assigns connections to readers (daryn)
HADOOP-9955. RPC idle connection closing is extremely inefficient (daryn)
BUG FIXES
Release 0.23.9 - 2013-07-08

View File

@ -226,4 +226,10 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
public static final String IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY = "ipc.client.fallback-to-simple-auth-allowed";
public static final boolean IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT = false;
/** How often the server scans for idle connections */
public static final String IPC_CLIENT_CONNECTION_IDLESCANINTERVAL_KEY =
"ipc.client.connection.idle-scan-interval.ms";
/** Default value for IPC_SERVER_CONNECTION_IDLE_SCAN_INTERVAL_KEY */
public static final int IPC_CLIENT_CONNECTION_IDLESCANINTERVAL_DEFAULT =
10000;
}

View File

@ -51,11 +51,13 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslException;
@ -344,16 +346,6 @@ public abstract class Server {
private int readThreads; // number of read threads
private int readerPendingConnectionQueue; // number of connections to queue per read thread
private Class<? extends Writable> rpcRequestClass; // class used for deserializing the rpc request
private int maxIdleTime; // the maximum idle time after
// which a client may be disconnected
private int thresholdIdleConnections; // the number of idle connections
// after which we will start
// cleaning up idle
// connections
int maxConnectionsToNuke; // the max number of
// connections to nuke
//during a cleanup
protected RpcMetrics rpcMetrics;
protected RpcDetailedMetrics rpcDetailedMetrics;
@ -371,13 +363,10 @@ public abstract class Server {
volatile private boolean running = true; // true while server runs
private BlockingQueue<Call> callQueue; // queued calls
private List<Connection> connectionList =
Collections.synchronizedList(new LinkedList<Connection>());
//maintain a list
//of client connections
// maintains the set of client connections and handles idle timeouts
private ConnectionManager connectionManager;
private Listener listener = null;
private Responder responder = null;
private int numConnections = 0;
private Handler[] handlers = null;
/**
@ -447,8 +436,8 @@ public abstract class Server {
}
@VisibleForTesting
List<Connection> getConnections() {
return connectionList;
Connection[] getConnections() {
return connectionManager.toArray();
}
/**
@ -516,11 +505,6 @@ public abstract class Server {
private Reader[] readers = null;
private int currentReader = 0;
private InetSocketAddress address; //the address we bind at
private Random rand = new Random();
private long lastCleanupRunTime = 0; //the last time when a cleanup connec-
//-tion (for idle connections) ran
private long cleanupInterval = 10000; //the minimum interval between
//two cleanup runs
private int backlogLength = conf.getInt(
CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_KEY,
CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_DEFAULT);
@ -630,58 +614,12 @@ public abstract class Server {
}
}
}
/** cleanup connections from connectionList. Choose a random range
* to scan and also have a limit on the number of the connections
* that will be cleanedup per run. The criteria for cleanup is the time
* for which the connection was idle. If 'force' is true then all
* connections will be looked at for the cleanup.
*/
private void cleanupConnections(boolean force) {
if (force || numConnections > thresholdIdleConnections) {
long currentTime = Time.now();
if (!force && (currentTime - lastCleanupRunTime) < cleanupInterval) {
return;
}
int start = 0;
int end = numConnections - 1;
if (!force) {
start = rand.nextInt() % numConnections;
end = rand.nextInt() % numConnections;
int temp;
if (end < start) {
temp = start;
start = end;
end = temp;
}
}
int i = start;
int numNuked = 0;
while (i <= end) {
Connection c;
synchronized (connectionList) {
try {
c = connectionList.get(i);
} catch (Exception e) {return;}
}
if (c.timedOut(currentTime)) {
if (LOG.isDebugEnabled())
LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
closeConnection(c);
numNuked++;
end--;
c = null;
if (!force && numNuked == maxConnectionsToNuke) break;
}
else i++;
}
lastCleanupRunTime = Time.now();
}
}
@Override
public void run() {
LOG.info(getName() + ": starting");
SERVER.set(Server.this);
connectionManager.startIdleScan();
while (running) {
SelectionKey key = null;
try {
@ -705,12 +643,11 @@ public abstract class Server {
// some thread(s) a chance to finish
LOG.warn("Out of Memory in server select", e);
closeCurrentConnection(key, e);
cleanupConnections(true);
connectionManager.closeIdle(true);
try { Thread.sleep(60000); } catch (Exception ie) {}
} catch (Exception e) {
closeCurrentConnection(key, e);
}
cleanupConnections(false);
}
LOG.info("Stopping " + this.getName());
@ -723,10 +660,9 @@ public abstract class Server {
selector= null;
acceptChannel= null;
// clean up all connections
while (!connectionList.isEmpty()) {
closeConnection(connectionList.remove(0));
}
// close all connections
connectionManager.stopIdleScan();
connectionManager.closeAll();
}
}
@ -734,8 +670,6 @@ public abstract class Server {
if (key != null) {
Connection c = (Connection)key.attachment();
if (c != null) {
if (LOG.isDebugEnabled())
LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
closeConnection(c);
c = null;
}
@ -746,8 +680,7 @@ public abstract class Server {
return (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress();
}
void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
Connection c = null;
void doAccept(SelectionKey key) throws InterruptedException, IOException, OutOfMemoryError {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel;
while ((channel = server.accept()) != null) {
@ -757,25 +690,9 @@ public abstract class Server {
channel.socket().setKeepAlive(true);
Reader reader = getReader();
try {
c = new Connection(channel, Time.now());
synchronized (connectionList) {
connectionList.add(numConnections, c);
numConnections++;
}
Connection c = connectionManager.register(channel);
key.attach(c); // so closeCurrentConnection can get the object
reader.addConnection(c);
if (LOG.isDebugEnabled())
LOG.debug("Server connection from " + c.toString() +
"; # active connections: " + numConnections +
"; # queued calls: " + callQueue.size());
} catch (InterruptedException ie) {
if (running) {
LOG.info(
getName() + ": disconnecting client " + c.getHostAddress() +
" due to unexpected interrupt");
}
closeConnection(c);
}
}
}
@ -803,10 +720,6 @@ public abstract class Server {
count = -1; //so that the (count < 0) block is executed
}
if (count < 0) {
if (LOG.isDebugEnabled())
LOG.debug(getName() + ": disconnecting client " +
c + ". Number of active connections: "+
numConnections);
closeConnection(c);
c = null;
}
@ -1247,12 +1160,6 @@ public abstract class Server {
rpcCount++;
}
private boolean timedOut(long currentTime) {
if (isIdle() && currentTime - lastContact > maxIdleTime)
return true;
return false;
}
private UserGroupInformation getAuthorizedUgi(String authorizedId)
throws InvalidToken, AccessControlException {
if (authMethod == AuthMethod.TOKEN) {
@ -2187,15 +2094,6 @@ public abstract class Server {
CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY,
CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT);
this.callQueue = new LinkedBlockingQueue<Call>(maxQueueSize);
this.maxIdleTime = 2 * conf.getInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_DEFAULT);
this.maxConnectionsToNuke = conf.getInt(
CommonConfigurationKeysPublic.IPC_CLIENT_KILL_MAX_KEY,
CommonConfigurationKeysPublic.IPC_CLIENT_KILL_MAX_DEFAULT);
this.thresholdIdleConnections = conf.getInt(
CommonConfigurationKeysPublic.IPC_CLIENT_IDLETHRESHOLD_KEY,
CommonConfigurationKeysPublic.IPC_CLIENT_IDLETHRESHOLD_DEFAULT);
this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
this.authorize =
conf.getBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION,
@ -2216,6 +2114,7 @@ public abstract class Server {
// Create the responder here
responder = new Responder();
connectionManager = new ConnectionManager();
if (secretManager != null) {
SaslRpcServer.init(conf);
@ -2274,11 +2173,7 @@ public abstract class Server {
}
private void closeConnection(Connection connection) {
synchronized (connectionList) {
if (connectionList.remove(connection))
numConnections--;
}
connection.close();
connectionManager.close(connection);
}
/**
@ -2533,7 +2428,7 @@ public abstract class Server {
* @return the number of open rpc connections
*/
public int getNumOpenConnections() {
return numConnections;
return connectionManager.size();
}
/**
@ -2643,4 +2538,151 @@ public abstract class Server {
int nBytes = initialRemaining - buf.remaining();
return (nBytes > 0) ? nBytes : ret;
}
private class ConnectionManager {
final private AtomicInteger count = new AtomicInteger();
final private Set<Connection> connections;
final private Timer idleScanTimer;
final private int idleScanThreshold;
final private int idleScanInterval;
final private int maxIdleTime;
final private int maxIdleToClose;
ConnectionManager() {
this.idleScanTimer = new Timer(
"IPC Server idle connection scanner for port " + getPort(), true);
this.idleScanThreshold = conf.getInt(
CommonConfigurationKeysPublic.IPC_CLIENT_IDLETHRESHOLD_KEY,
CommonConfigurationKeysPublic.IPC_CLIENT_IDLETHRESHOLD_DEFAULT);
this.idleScanInterval = conf.getInt(
CommonConfigurationKeys.IPC_CLIENT_CONNECTION_IDLESCANINTERVAL_KEY,
CommonConfigurationKeys.IPC_CLIENT_CONNECTION_IDLESCANINTERVAL_DEFAULT);
this.maxIdleTime = 2 * conf.getInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_DEFAULT);
this.maxIdleToClose = conf.getInt(
CommonConfigurationKeysPublic.IPC_CLIENT_KILL_MAX_KEY,
CommonConfigurationKeysPublic.IPC_CLIENT_KILL_MAX_DEFAULT);
// create a set with concurrency -and- a thread-safe iterator, add 2
// for listener and idle closer threads
this.connections = Collections.newSetFromMap(
new ConcurrentHashMap<Connection,Boolean>(
maxQueueSize, 0.75f, readThreads+2));
}
private boolean add(Connection connection) {
boolean added = connections.add(connection);
if (added) {
count.getAndIncrement();
}
return added;
}
private boolean remove(Connection connection) {
boolean removed = connections.remove(connection);
if (removed) {
count.getAndDecrement();
}
return removed;
}
int size() {
return count.get();
}
Connection[] toArray() {
return connections.toArray(new Connection[0]);
}
Connection register(SocketChannel channel) {
Connection connection = new Connection(channel, Time.now());
add(connection);
if (LOG.isDebugEnabled()) {
LOG.debug("Server connection from " + connection +
"; # active connections: " + size() +
"; # queued calls: " + callQueue.size());
}
return connection;
}
boolean close(Connection connection) {
boolean exists = remove(connection);
if (exists) {
if (LOG.isDebugEnabled()) {
LOG.debug(Thread.currentThread().getName() +
": disconnecting client " + connection +
". Number of active connections: "+ size());
}
// only close if actually removed to avoid double-closing due
// to possible races
connection.close();
}
return exists;
}
// synch'ed to avoid explicit invocation upon OOM from colliding with
// timer task firing
synchronized void closeIdle(boolean scanAll) {
long minLastContact = Time.now() - maxIdleTime;
// concurrent iterator might miss new connections added
// during the iteration, but that's ok because they won't
// be idle yet anyway and will be caught on next scan
int closed = 0;
for (Connection connection : connections) {
// stop if connections dropped below threshold unless scanning all
if (!scanAll && size() < idleScanThreshold) {
break;
}
// stop if not scanning all and max connections are closed
if (connection.isIdle() &&
connection.getLastContact() < minLastContact &&
close(connection) &&
!scanAll && (++closed == maxIdleToClose)) {
break;
}
}
}
void closeAll() {
// use a copy of the connections to be absolutely sure the concurrent
// iterator doesn't miss a connection
for (Connection connection : toArray()) {
close(connection);
}
}
void startIdleScan() {
scheduleIdleScanTask();
}
void stopIdleScan() {
idleScanTimer.cancel();
}
private void scheduleIdleScanTask() {
if (!running) {
return;
}
TimerTask idleScanTask = new TimerTask(){
@Override
public void run() {
if (!running) {
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug(Thread.currentThread().getName()+": task running");
}
try {
closeIdle(false);
} finally {
// explicitly reschedule so next execution occurs relative
// to the end of this scan, not the beginning
scheduleIdleScanTask();
}
}
};
idleScanTimer.schedule(idleScanTask, idleScanInterval);
}
}
}

View File

@ -46,12 +46,15 @@ import java.util.List;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.SocketFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@ -68,8 +71,10 @@ import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
import org.apache.hadoop.net.ConnectTimeoutException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
@ -83,7 +88,7 @@ public class TestIPC {
public static final Log LOG =
LogFactory.getLog(TestIPC.class);
final private static Configuration conf = new Configuration();
private static Configuration conf;
final static private int PING_INTERVAL = 1000;
final static private int MIN_SLEEP_TIME = 1000;
/**
@ -93,7 +98,9 @@ public class TestIPC {
static boolean WRITABLE_FAULTS_ENABLED = true;
static int WRITABLE_FAULTS_SLEEP = 0;
static {
@Before
public void setupConf() {
conf = new Configuration();
Client.setPingInterval(conf, PING_INTERVAL);
}
@ -759,6 +766,113 @@ public class TestIPC {
server.stop();
}
@Test(timeout=30000)
public void testConnectionIdleTimeouts() throws Exception {
((Log4JLogger)Server.LOG).getLogger().setLevel(Level.DEBUG);
final int maxIdle = 1000;
final int cleanupInterval = maxIdle*3/4; // stagger cleanups
final int killMax = 3;
final int clients = 1 + killMax*2; // 1 to block, 2 batches to kill
conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, maxIdle);
conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_IDLETHRESHOLD_KEY, 0);
conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_KILL_MAX_KEY, killMax);
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECTION_IDLESCANINTERVAL_KEY, cleanupInterval);
final CyclicBarrier firstCallBarrier = new CyclicBarrier(2);
final CyclicBarrier callBarrier = new CyclicBarrier(clients);
final CountDownLatch allCallLatch = new CountDownLatch(clients);
final AtomicBoolean error = new AtomicBoolean();
final TestServer server = new TestServer(clients, false);
Thread[] threads = new Thread[clients];
try {
server.callListener = new Runnable(){
AtomicBoolean first = new AtomicBoolean(true);
@Override
public void run() {
try {
allCallLatch.countDown();
// block first call
if (first.compareAndSet(true, false)) {
firstCallBarrier.await();
} else {
callBarrier.await();
}
} catch (Throwable t) {
LOG.error(t);
error.set(true);
}
}
};
server.start();
// start client
final CountDownLatch callReturned = new CountDownLatch(clients-1);
final InetSocketAddress addr = NetUtils.getConnectAddress(server);
final Configuration clientConf = new Configuration();
clientConf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, 10000);
for (int i=0; i < clients; i++) {
threads[i] = new Thread(new Runnable(){
@Override
public void run() {
Client client = new Client(LongWritable.class, clientConf);
try {
client.call(new LongWritable(Thread.currentThread().getId()),
addr, null, null, 0, clientConf);
callReturned.countDown();
Thread.sleep(10000);
} catch (IOException e) {
LOG.error(e);
} catch (InterruptedException e) {
}
}
});
threads[i].start();
}
// all calls blocked in handler so all connections made
allCallLatch.await();
assertFalse(error.get());
assertEquals(clients, server.getNumOpenConnections());
// wake up blocked calls and wait for client call to return, no
// connections should have closed
callBarrier.await();
callReturned.await();
assertEquals(clients, server.getNumOpenConnections());
// server won't close till maxIdle*2, so give scanning thread time to
// be almost ready to close idle connection. after which it should
// close max connections on every cleanupInterval
Thread.sleep(maxIdle*2-cleanupInterval);
for (int i=clients; i > 1; i -= killMax) {
Thread.sleep(cleanupInterval);
assertFalse(error.get());
assertEquals(i, server.getNumOpenConnections());
}
// connection for the first blocked call should still be open
Thread.sleep(cleanupInterval);
assertFalse(error.get());
assertEquals(1, server.getNumOpenConnections());
// wake up call and ensure connection times out
firstCallBarrier.await();
Thread.sleep(maxIdle*2);
assertFalse(error.get());
assertEquals(0, server.getNumOpenConnections());
} finally {
for (Thread t : threads) {
if (t != null) {
t.interrupt();
t.join();
}
server.stop();
}
}
}
/**
* Make a call from a client and verify if header info is changed in server side
*/
@ -768,7 +882,7 @@ public class TestIPC {
client.call(new LongWritable(RANDOM.nextLong()),
addr, null, null, MIN_SLEEP_TIME, serviceClass, conf);
Connection connection = server.getConnections().get(0);
Connection connection = server.getConnections()[0];
int serviceClass2 = connection.getServiceClass();
assertFalse(noChanged ^ serviceClass == serviceClass2);
client.stop();