diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java index 3cde5e5b93c..a4469a3025a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java @@ -76,6 +76,21 @@ public interface FederationRPCMBean { */ int getRpcClientNumActiveConnections(); + /** + * Get the number of idle RPC connections between the Router and the NNs. + * @return Number of idle RPC connections between the Router and the NNs. + */ + int getRpcClientNumIdleConnections(); + + /** + * Get the number of recently active RPC connections between + * the Router and the NNs. + * + * @return Number of recently active RPC connections between + * the Router and the NNs. + */ + int getRpcClientNumActiveConnectionsRecently(); + /** * Get the number of RPC connections to be created. * @return Number of RPC connections to be created. diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java index 887d50bf3d5..1e6aa8050d9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java @@ -208,6 +208,16 @@ public int getRpcClientNumActiveConnections() { return rpcServer.getRPCClient().getNumActiveConnections(); } + @Override + public int getRpcClientNumIdleConnections() { + return rpcServer.getRPCClient().getNumIdleConnections(); + } + + @Override + public int getRpcClientNumActiveConnectionsRecently() { + return rpcServer.getRPCClient().getNumActiveConnectionsRecently(); + } + @Override public int getRpcClientNumCreatingConnections() { return rpcServer.getRPCClient().getNumCreatingConnections(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java index 02a3dbeb4ea..9a5434b91ce 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java @@ -18,9 +18,13 @@ package org.apache.hadoop.hdfs.server.federation.router; import java.net.InetSocketAddress; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo; import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Context to track a connection in a {@link ConnectionPool}. When a client uses @@ -36,13 +40,19 @@ */ public class ConnectionContext { + private static final Logger LOG = + LoggerFactory.getLogger(ConnectionContext.class); + /** Client for the connection. */ private final ProxyAndInfo client; /** How many threads are using this connection. */ private int numThreads = 0; /** If the connection is closed. */ private boolean closed = false; - + /** Last timestamp the connection was active. */ + private long lastActiveTs = 0; + /** The connection's active status would expire after this window. */ + private final static long ACTIVE_WINDOW_TIME = TimeUnit.SECONDS.toMillis(30); public ConnectionContext(ProxyAndInfo connection) { this.client = connection; @@ -57,6 +67,16 @@ public synchronized boolean isActive() { return this.numThreads > 0; } + /** + * Check if the connection is/was active recently. + * + * @return True if the connection is active or + * was active in the past period of time. + */ + public synchronized boolean isActiveRecently() { + return Time.monotonicNow() - this.lastActiveTs <= ACTIVE_WINDOW_TIME; + } + /** * Check if the connection is closed. * @@ -83,30 +103,41 @@ public synchronized boolean isUsable() { */ public synchronized ProxyAndInfo getClient() { this.numThreads++; + this.lastActiveTs = Time.monotonicNow(); return this.client; } /** - * Release this connection. If the connection was closed, close the proxy. - * Otherwise, mark the connection as not used by us anymore. + * Release this connection. */ public synchronized void release() { - if (--this.numThreads == 0 && this.closed) { - close(); + if (this.numThreads > 0) { + this.numThreads--; } } /** - * We will not use this connection anymore. If it's not being used, we close - * it. Otherwise, we let release() do it once we are done with it. + * Close a connection. Only idle connections can be closed since + * the RPC proxy would be shut down immediately. + * + * @param force whether the connection should be closed anyway. */ - public synchronized void close() { - this.closed = true; - if (this.numThreads == 0) { - Object proxy = this.client.getProxy(); - // Nobody should be using this anymore so it should close right away - RPC.stopProxy(proxy); + public synchronized void close(boolean force) { + if (!force && this.numThreads > 0) { + // this is an erroneous case but we have to close the connection + // anyway since there will be connection leak if we don't do so + // the connection has been moved out of the pool + LOG.error("Active connection with {} handlers will be closed", + this.numThreads); } + this.closed = true; + Object proxy = this.client.getProxy(); + // Nobody should be using this anymore so it should close right away + RPC.stopProxy(proxy); + } + + public synchronized void close() { + close(false); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java index 9ec3b54ed50..b773a79948e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java @@ -281,6 +281,42 @@ public int getNumActiveConnections() { return total; } + /** + * Get number of idle connections. + * + * @return Number of active connections. + */ + public int getNumIdleConnections() { + int total = 0; + readLock.lock(); + try { + for (ConnectionPool pool : this.pools.values()) { + total += pool.getNumIdleConnections(); + } + } finally { + readLock.unlock(); + } + return total; + } + + /** + * Get number of recently active connections. + * + * @return Number of recently active connections. + */ + public int getNumActiveConnectionsRecently() { + int total = 0; + readLock.lock(); + try { + for (ConnectionPool pool : this.pools.values()) { + total += pool.getNumActiveConnectionsRecently(); + } + } finally { + readLock.unlock(); + } + return total; + } + /** * Get the number of connections to be created. * @@ -327,12 +363,21 @@ void cleanup(ConnectionPool pool) { // Check if the pool hasn't been active in a while or not 50% are used long timeSinceLastActive = Time.now() - pool.getLastActiveTime(); int total = pool.getNumConnections(); - int active = pool.getNumActiveConnections(); + // Active is a transient status in many cases for a connection since + // the handler thread uses the connection very quickly. Thus the number + // of connections with handlers using at the call time is constantly low. + // Recently active is more lasting status and it shows how many + // connections have been used with a recent time period. (i.e. 30 seconds) + int active = pool.getNumActiveConnectionsRecently(); float poolMinActiveRatio = pool.getMinActiveRatio(); if (timeSinceLastActive > connectionCleanupPeriodMs || active < poolMinActiveRatio * total) { - // Remove and close 1 connection - List conns = pool.removeConnections(1); + // Be greedy here to close as many connections as possible in one shot + // The number should at least be 1 + int targetConnectionsCount = Math.max(1, + (int)(poolMinActiveRatio * total) - active); + List conns = + pool.removeConnections(targetConnectionsCount); for (ConnectionContext conn : conns) { conn.close(); } @@ -414,7 +459,7 @@ public void run() { ConnectionPool pool = this.queue.take(); try { int total = pool.getNumConnections(); - int active = pool.getNumActiveConnections(); + int active = pool.getNumActiveConnectionsRecently(); float poolMinActiveRatio = pool.getMinActiveRatio(); if (pool.getNumConnections() < pool.getMaxSize() && active >= poolMinActiveRatio * total) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java index 52e7cebd260..827e62ce3ef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java @@ -252,19 +252,23 @@ public synchronized void addConnection(ConnectionContext conn) { */ public synchronized List removeConnections(int num) { List removed = new LinkedList<>(); - - // Remove and close the last connection - List tmpConnections = new ArrayList<>(); - for (int i=0; i this.minSize) { + int targetCount = Math.min(num, this.connections.size() - this.minSize); + // Remove and close targetCount of connections + List tmpConnections = new ArrayList<>(); + for (int i = 0; i < this.connections.size(); i++) { + ConnectionContext conn = this.connections.get(i); + // Only pick idle connections to close + if (removed.size() < targetCount && conn.isUsable()) { + removed.add(conn); + } else { + tmpConnections.add(conn); + } } + this.connections = tmpConnections; } - this.connections = tmpConnections; - + LOG.debug("Expected to remove {} connection " + + "and actually removed {} connections", num, removed.size()); return removed; } @@ -278,7 +282,7 @@ protected synchronized void close() { this.connectionPoolId, timeSinceLastActive); for (ConnectionContext connection : this.connections) { - connection.close(); + connection.close(true); } this.connections.clear(); } @@ -309,6 +313,39 @@ protected int getNumActiveConnections() { return ret; } + /** + * Number of usable i.e. no active thread connections. + * + * @return Number of idle connections + */ + protected int getNumIdleConnections() { + int ret = 0; + + List tmpConnections = this.connections; + for (ConnectionContext conn : tmpConnections) { + if (conn.isUsable()) { + ret++; + } + } + return ret; + } + + /** + * Number of active connections recently in the pool. + * + * @return Number of active connections recently. + */ + protected int getNumActiveConnectionsRecently() { + int ret = 0; + List tmpConnections = this.connections; + for (ConnectionContext conn : tmpConnections) { + if (conn.isActiveRecently()) { + ret++; + } + } + return ret; + } + /** * Get the last time the connection pool was used. * @@ -331,12 +368,18 @@ public String toString() { public String getJSON() { final Map info = new LinkedHashMap<>(); info.put("active", Integer.toString(getNumActiveConnections())); + info.put("recent_active", + Integer.toString(getNumActiveConnectionsRecently())); + info.put("idle", Integer.toString(getNumIdleConnections())); info.put("total", Integer.toString(getNumConnections())); if (LOG.isDebugEnabled()) { List tmpConnections = this.connections; for (int i=0; i