diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index d25c4f1c2c2..47e9bd4c21c 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -2048,6 +2048,8 @@ Release 0.23.10 - UNRELEASED HADOOP-9956. RPC listener inefficiently assigns connections to readers (daryn) + HADOOP-9955. RPC idle connection closing is extremely inefficient (daryn) + BUG FIXES Release 0.23.9 - 2013-07-08 diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java index 06c1a734556..e52659dd6e7 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java @@ -226,4 +226,10 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic { public static final String IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY = "ipc.client.fallback-to-simple-auth-allowed"; public static final boolean IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT = false; + /** How often the server scans for idle connections */ + public static final String IPC_CLIENT_CONNECTION_IDLESCANINTERVAL_KEY = + "ipc.client.connection.idle-scan-interval.ms"; + /** Default value for IPC_SERVER_CONNECTION_IDLE_SCAN_INTERVAL_KEY */ + public static final int IPC_CLIENT_CONNECTION_IDLESCANINTERVAL_DEFAULT = + 10000; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 2760bc67e4c..c8c3403df80 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -51,11 +51,13 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Random; import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; import javax.security.sasl.Sasl; import javax.security.sasl.SaslException; @@ -344,16 +346,6 @@ public abstract class Server { private int readThreads; // number of read threads private int readerPendingConnectionQueue; // number of connections to queue per read thread private Class rpcRequestClass; // class used for deserializing the rpc request - private int maxIdleTime; // the maximum idle time after - // which a client may be disconnected - private int thresholdIdleConnections; // the number of idle connections - // after which we will start - // cleaning up idle - // connections - int maxConnectionsToNuke; // the max number of - // connections to nuke - //during a cleanup - protected RpcMetrics rpcMetrics; protected RpcDetailedMetrics rpcDetailedMetrics; @@ -371,13 +363,10 @@ public abstract class Server { volatile private boolean running = true; // true while server runs private BlockingQueue callQueue; // queued calls - private List connectionList = - Collections.synchronizedList(new LinkedList()); - //maintain a list - //of client connections + // maintains the set of client connections and handles idle timeouts + private ConnectionManager connectionManager; private Listener listener = null; private Responder responder = null; - private int numConnections = 0; private Handler[] handlers = null; /** @@ -447,8 +436,8 @@ public abstract class Server { } @VisibleForTesting - List getConnections() { - return connectionList; + Connection[] getConnections() { + return connectionManager.toArray(); } /** @@ -516,11 +505,6 @@ public abstract class Server { private Reader[] readers = null; private int currentReader = 0; private InetSocketAddress address; //the address we bind at - private Random rand = new Random(); - private long lastCleanupRunTime = 0; //the last time when a cleanup connec- - //-tion (for idle connections) ran - private long cleanupInterval = 10000; //the minimum interval between - //two cleanup runs private int backlogLength = conf.getInt( CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_KEY, CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_DEFAULT); @@ -630,58 +614,12 @@ public abstract class Server { } } } - /** cleanup connections from connectionList. Choose a random range - * to scan and also have a limit on the number of the connections - * that will be cleanedup per run. The criteria for cleanup is the time - * for which the connection was idle. If 'force' is true then all - * connections will be looked at for the cleanup. - */ - private void cleanupConnections(boolean force) { - if (force || numConnections > thresholdIdleConnections) { - long currentTime = Time.now(); - if (!force && (currentTime - lastCleanupRunTime) < cleanupInterval) { - return; - } - int start = 0; - int end = numConnections - 1; - if (!force) { - start = rand.nextInt() % numConnections; - end = rand.nextInt() % numConnections; - int temp; - if (end < start) { - temp = start; - start = end; - end = temp; - } - } - int i = start; - int numNuked = 0; - while (i <= end) { - Connection c; - synchronized (connectionList) { - try { - c = connectionList.get(i); - } catch (Exception e) {return;} - } - if (c.timedOut(currentTime)) { - if (LOG.isDebugEnabled()) - LOG.debug(getName() + ": disconnecting client " + c.getHostAddress()); - closeConnection(c); - numNuked++; - end--; - c = null; - if (!force && numNuked == maxConnectionsToNuke) break; - } - else i++; - } - lastCleanupRunTime = Time.now(); - } - } @Override public void run() { LOG.info(getName() + ": starting"); SERVER.set(Server.this); + connectionManager.startIdleScan(); while (running) { SelectionKey key = null; try { @@ -705,12 +643,11 @@ public abstract class Server { // some thread(s) a chance to finish LOG.warn("Out of Memory in server select", e); closeCurrentConnection(key, e); - cleanupConnections(true); + connectionManager.closeIdle(true); try { Thread.sleep(60000); } catch (Exception ie) {} } catch (Exception e) { closeCurrentConnection(key, e); } - cleanupConnections(false); } LOG.info("Stopping " + this.getName()); @@ -723,10 +660,9 @@ public abstract class Server { selector= null; acceptChannel= null; - // clean up all connections - while (!connectionList.isEmpty()) { - closeConnection(connectionList.remove(0)); - } + // close all connections + connectionManager.stopIdleScan(); + connectionManager.closeAll(); } } @@ -734,8 +670,6 @@ public abstract class Server { if (key != null) { Connection c = (Connection)key.attachment(); if (c != null) { - if (LOG.isDebugEnabled()) - LOG.debug(getName() + ": disconnecting client " + c.getHostAddress()); closeConnection(c); c = null; } @@ -746,8 +680,7 @@ public abstract class Server { return (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress(); } - void doAccept(SelectionKey key) throws IOException, OutOfMemoryError { - Connection c = null; + void doAccept(SelectionKey key) throws InterruptedException, IOException, OutOfMemoryError { ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel channel; while ((channel = server.accept()) != null) { @@ -757,25 +690,9 @@ public abstract class Server { channel.socket().setKeepAlive(true); Reader reader = getReader(); - try { - c = new Connection(channel, Time.now()); - synchronized (connectionList) { - connectionList.add(numConnections, c); - numConnections++; - } - reader.addConnection(c); - if (LOG.isDebugEnabled()) - LOG.debug("Server connection from " + c.toString() + - "; # active connections: " + numConnections + - "; # queued calls: " + callQueue.size()); - } catch (InterruptedException ie) { - if (running) { - LOG.info( - getName() + ": disconnecting client " + c.getHostAddress() + - " due to unexpected interrupt"); - } - closeConnection(c); - } + Connection c = connectionManager.register(channel); + key.attach(c); // so closeCurrentConnection can get the object + reader.addConnection(c); } } @@ -803,10 +720,6 @@ public abstract class Server { count = -1; //so that the (count < 0) block is executed } if (count < 0) { - if (LOG.isDebugEnabled()) - LOG.debug(getName() + ": disconnecting client " + - c + ". Number of active connections: "+ - numConnections); closeConnection(c); c = null; } @@ -1247,12 +1160,6 @@ public abstract class Server { rpcCount++; } - private boolean timedOut(long currentTime) { - if (isIdle() && currentTime - lastContact > maxIdleTime) - return true; - return false; - } - private UserGroupInformation getAuthorizedUgi(String authorizedId) throws InvalidToken, AccessControlException { if (authMethod == AuthMethod.TOKEN) { @@ -2187,15 +2094,6 @@ public abstract class Server { CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY, CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT); this.callQueue = new LinkedBlockingQueue(maxQueueSize); - this.maxIdleTime = 2 * conf.getInt( - CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, - CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_DEFAULT); - this.maxConnectionsToNuke = conf.getInt( - CommonConfigurationKeysPublic.IPC_CLIENT_KILL_MAX_KEY, - CommonConfigurationKeysPublic.IPC_CLIENT_KILL_MAX_DEFAULT); - this.thresholdIdleConnections = conf.getInt( - CommonConfigurationKeysPublic.IPC_CLIENT_IDLETHRESHOLD_KEY, - CommonConfigurationKeysPublic.IPC_CLIENT_IDLETHRESHOLD_DEFAULT); this.secretManager = (SecretManager) secretManager; this.authorize = conf.getBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, @@ -2216,6 +2114,7 @@ public abstract class Server { // Create the responder here responder = new Responder(); + connectionManager = new ConnectionManager(); if (secretManager != null) { SaslRpcServer.init(conf); @@ -2274,11 +2173,7 @@ public abstract class Server { } private void closeConnection(Connection connection) { - synchronized (connectionList) { - if (connectionList.remove(connection)) - numConnections--; - } - connection.close(); + connectionManager.close(connection); } /** @@ -2533,7 +2428,7 @@ public abstract class Server { * @return the number of open rpc connections */ public int getNumOpenConnections() { - return numConnections; + return connectionManager.size(); } /** @@ -2643,4 +2538,151 @@ public abstract class Server { int nBytes = initialRemaining - buf.remaining(); return (nBytes > 0) ? nBytes : ret; } + + private class ConnectionManager { + final private AtomicInteger count = new AtomicInteger(); + final private Set connections; + + final private Timer idleScanTimer; + final private int idleScanThreshold; + final private int idleScanInterval; + final private int maxIdleTime; + final private int maxIdleToClose; + + ConnectionManager() { + this.idleScanTimer = new Timer( + "IPC Server idle connection scanner for port " + getPort(), true); + this.idleScanThreshold = conf.getInt( + CommonConfigurationKeysPublic.IPC_CLIENT_IDLETHRESHOLD_KEY, + CommonConfigurationKeysPublic.IPC_CLIENT_IDLETHRESHOLD_DEFAULT); + this.idleScanInterval = conf.getInt( + CommonConfigurationKeys.IPC_CLIENT_CONNECTION_IDLESCANINTERVAL_KEY, + CommonConfigurationKeys.IPC_CLIENT_CONNECTION_IDLESCANINTERVAL_DEFAULT); + this.maxIdleTime = 2 * conf.getInt( + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_DEFAULT); + this.maxIdleToClose = conf.getInt( + CommonConfigurationKeysPublic.IPC_CLIENT_KILL_MAX_KEY, + CommonConfigurationKeysPublic.IPC_CLIENT_KILL_MAX_DEFAULT); + // create a set with concurrency -and- a thread-safe iterator, add 2 + // for listener and idle closer threads + this.connections = Collections.newSetFromMap( + new ConcurrentHashMap( + maxQueueSize, 0.75f, readThreads+2)); + } + + private boolean add(Connection connection) { + boolean added = connections.add(connection); + if (added) { + count.getAndIncrement(); + } + return added; + } + + private boolean remove(Connection connection) { + boolean removed = connections.remove(connection); + if (removed) { + count.getAndDecrement(); + } + return removed; + } + + int size() { + return count.get(); + } + + Connection[] toArray() { + return connections.toArray(new Connection[0]); + } + + Connection register(SocketChannel channel) { + Connection connection = new Connection(channel, Time.now()); + add(connection); + if (LOG.isDebugEnabled()) { + LOG.debug("Server connection from " + connection + + "; # active connections: " + size() + + "; # queued calls: " + callQueue.size()); + } + return connection; + } + + boolean close(Connection connection) { + boolean exists = remove(connection); + if (exists) { + if (LOG.isDebugEnabled()) { + LOG.debug(Thread.currentThread().getName() + + ": disconnecting client " + connection + + ". Number of active connections: "+ size()); + } + // only close if actually removed to avoid double-closing due + // to possible races + connection.close(); + } + return exists; + } + + // synch'ed to avoid explicit invocation upon OOM from colliding with + // timer task firing + synchronized void closeIdle(boolean scanAll) { + long minLastContact = Time.now() - maxIdleTime; + // concurrent iterator might miss new connections added + // during the iteration, but that's ok because they won't + // be idle yet anyway and will be caught on next scan + int closed = 0; + for (Connection connection : connections) { + // stop if connections dropped below threshold unless scanning all + if (!scanAll && size() < idleScanThreshold) { + break; + } + // stop if not scanning all and max connections are closed + if (connection.isIdle() && + connection.getLastContact() < minLastContact && + close(connection) && + !scanAll && (++closed == maxIdleToClose)) { + break; + } + } + } + + void closeAll() { + // use a copy of the connections to be absolutely sure the concurrent + // iterator doesn't miss a connection + for (Connection connection : toArray()) { + close(connection); + } + } + + void startIdleScan() { + scheduleIdleScanTask(); + } + + void stopIdleScan() { + idleScanTimer.cancel(); + } + + private void scheduleIdleScanTask() { + if (!running) { + return; + } + TimerTask idleScanTask = new TimerTask(){ + @Override + public void run() { + if (!running) { + return; + } + if (LOG.isDebugEnabled()) { + LOG.debug(Thread.currentThread().getName()+": task running"); + } + try { + closeIdle(false); + } finally { + // explicitly reschedule so next execution occurs relative + // to the end of this scan, not the beginning + scheduleIdleScanTask(); + } + } + }; + idleScanTimer.schedule(idleScanTask, idleScanInterval); + } + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java index a84232e6c61..02516a183aa 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java @@ -46,12 +46,15 @@ import java.util.List; import java.util.Random; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import javax.net.SocketFactory; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; @@ -68,8 +71,10 @@ import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto; import org.apache.hadoop.net.ConnectTimeoutException; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.StringUtils; +import org.apache.log4j.Level; import org.junit.Assert; import org.junit.Assume; +import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; @@ -83,7 +88,7 @@ public class TestIPC { public static final Log LOG = LogFactory.getLog(TestIPC.class); - final private static Configuration conf = new Configuration(); + private static Configuration conf; final static private int PING_INTERVAL = 1000; final static private int MIN_SLEEP_TIME = 1000; /** @@ -93,7 +98,9 @@ public class TestIPC { static boolean WRITABLE_FAULTS_ENABLED = true; static int WRITABLE_FAULTS_SLEEP = 0; - static { + @Before + public void setupConf() { + conf = new Configuration(); Client.setPingInterval(conf, PING_INTERVAL); } @@ -759,6 +766,113 @@ public class TestIPC { server.stop(); } + @Test(timeout=30000) + public void testConnectionIdleTimeouts() throws Exception { + ((Log4JLogger)Server.LOG).getLogger().setLevel(Level.DEBUG); + final int maxIdle = 1000; + final int cleanupInterval = maxIdle*3/4; // stagger cleanups + final int killMax = 3; + final int clients = 1 + killMax*2; // 1 to block, 2 batches to kill + + conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, maxIdle); + conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_IDLETHRESHOLD_KEY, 0); + conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_KILL_MAX_KEY, killMax); + conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECTION_IDLESCANINTERVAL_KEY, cleanupInterval); + + final CyclicBarrier firstCallBarrier = new CyclicBarrier(2); + final CyclicBarrier callBarrier = new CyclicBarrier(clients); + final CountDownLatch allCallLatch = new CountDownLatch(clients); + final AtomicBoolean error = new AtomicBoolean(); + + final TestServer server = new TestServer(clients, false); + Thread[] threads = new Thread[clients]; + try { + server.callListener = new Runnable(){ + AtomicBoolean first = new AtomicBoolean(true); + @Override + public void run() { + try { + allCallLatch.countDown(); + // block first call + if (first.compareAndSet(true, false)) { + firstCallBarrier.await(); + } else { + callBarrier.await(); + } + } catch (Throwable t) { + LOG.error(t); + error.set(true); + } + } + }; + server.start(); + + // start client + final CountDownLatch callReturned = new CountDownLatch(clients-1); + final InetSocketAddress addr = NetUtils.getConnectAddress(server); + final Configuration clientConf = new Configuration(); + clientConf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, 10000); + for (int i=0; i < clients; i++) { + threads[i] = new Thread(new Runnable(){ + @Override + public void run() { + Client client = new Client(LongWritable.class, clientConf); + try { + client.call(new LongWritable(Thread.currentThread().getId()), + addr, null, null, 0, clientConf); + callReturned.countDown(); + Thread.sleep(10000); + } catch (IOException e) { + LOG.error(e); + } catch (InterruptedException e) { + } + } + }); + threads[i].start(); + } + + // all calls blocked in handler so all connections made + allCallLatch.await(); + assertFalse(error.get()); + assertEquals(clients, server.getNumOpenConnections()); + + // wake up blocked calls and wait for client call to return, no + // connections should have closed + callBarrier.await(); + callReturned.await(); + assertEquals(clients, server.getNumOpenConnections()); + + // server won't close till maxIdle*2, so give scanning thread time to + // be almost ready to close idle connection. after which it should + // close max connections on every cleanupInterval + Thread.sleep(maxIdle*2-cleanupInterval); + for (int i=clients; i > 1; i -= killMax) { + Thread.sleep(cleanupInterval); + assertFalse(error.get()); + assertEquals(i, server.getNumOpenConnections()); + } + + // connection for the first blocked call should still be open + Thread.sleep(cleanupInterval); + assertFalse(error.get()); + assertEquals(1, server.getNumOpenConnections()); + + // wake up call and ensure connection times out + firstCallBarrier.await(); + Thread.sleep(maxIdle*2); + assertFalse(error.get()); + assertEquals(0, server.getNumOpenConnections()); + } finally { + for (Thread t : threads) { + if (t != null) { + t.interrupt(); + t.join(); + } + server.stop(); + } + } + } + /** * Make a call from a client and verify if header info is changed in server side */ @@ -768,7 +882,7 @@ public class TestIPC { client.call(new LongWritable(RANDOM.nextLong()), addr, null, null, MIN_SLEEP_TIME, serviceClass, conf); - Connection connection = server.getConnections().get(0); + Connection connection = server.getConnections()[0]; int serviceClass2 = connection.getServiceClass(); assertFalse(noChanged ^ serviceClass == serviceClass2); client.stop();