HDFS-15757 RBF: Improving Router Connection Management (#2651)

This commit is contained in:
lfengnan 2021-05-19 10:53:42 -07:00 committed by GitHub
parent 2960d83c25
commit 43bf009112
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 209 additions and 39 deletions

View File

@ -76,6 +76,21 @@ public interface FederationRPCMBean {
*/
int getRpcClientNumActiveConnections();
/**
* Get the number of idle RPC connections between the Router and the NNs.
* @return Number of idle RPC connections between the Router and the NNs.
*/
int getRpcClientNumIdleConnections();
/**
* Get the number of recently active RPC connections between
* the Router and the NNs.
*
* @return Number of recently active RPC connections between
* the Router and the NNs.
*/
int getRpcClientNumActiveConnectionsRecently();
/**
* Get the number of RPC connections to be created.
* @return Number of RPC connections to be created.

View File

@ -208,6 +208,16 @@ public class FederationRPCMetrics implements FederationRPCMBean {
return rpcServer.getRPCClient().getNumActiveConnections();
}
@Override
public int getRpcClientNumIdleConnections() {
return rpcServer.getRPCClient().getNumIdleConnections();
}
@Override
public int getRpcClientNumActiveConnectionsRecently() {
return rpcServer.getRPCClient().getNumActiveConnectionsRecently();
}
@Override
public int getRpcClientNumCreatingConnections() {
return rpcServer.getRPCClient().getNumCreatingConnections();

View File

@ -18,9 +18,13 @@
package org.apache.hadoop.hdfs.server.federation.router;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Context to track a connection in a {@link ConnectionPool}. When a client uses
@ -36,13 +40,19 @@ import org.apache.hadoop.ipc.RPC;
*/
public class ConnectionContext {
private static final Logger LOG =
LoggerFactory.getLogger(ConnectionContext.class);
/** Client for the connection. */
private final ProxyAndInfo<?> client;
/** How many threads are using this connection. */
private int numThreads = 0;
/** If the connection is closed. */
private boolean closed = false;
/** Last timestamp the connection was active. */
private long lastActiveTs = 0;
/** The connection's active status would expire after this window. */
private final static long ACTIVE_WINDOW_TIME = TimeUnit.SECONDS.toMillis(30);
public ConnectionContext(ProxyAndInfo<?> connection) {
this.client = connection;
@ -57,6 +67,16 @@ public class ConnectionContext {
return this.numThreads > 0;
}
/**
* Check if the connection is/was active recently.
*
* @return True if the connection is active or
* was active in the past period of time.
*/
public synchronized boolean isActiveRecently() {
return Time.monotonicNow() - this.lastActiveTs <= ACTIVE_WINDOW_TIME;
}
/**
* Check if the connection is closed.
*
@ -83,30 +103,41 @@ public class ConnectionContext {
*/
public synchronized ProxyAndInfo<?> getClient() {
this.numThreads++;
this.lastActiveTs = Time.monotonicNow();
return this.client;
}
/**
* Release this connection. If the connection was closed, close the proxy.
* Otherwise, mark the connection as not used by us anymore.
* Release this connection.
*/
public synchronized void release() {
if (--this.numThreads == 0 && this.closed) {
close();
if (this.numThreads > 0) {
this.numThreads--;
}
}
/**
* We will not use this connection anymore. If it's not being used, we close
* it. Otherwise, we let release() do it once we are done with it.
* Close a connection. Only idle connections can be closed since
* the RPC proxy would be shut down immediately.
*
* @param force whether the connection should be closed anyway.
*/
public synchronized void close() {
this.closed = true;
if (this.numThreads == 0) {
Object proxy = this.client.getProxy();
// Nobody should be using this anymore so it should close right away
RPC.stopProxy(proxy);
public synchronized void close(boolean force) {
if (!force && this.numThreads > 0) {
// this is an erroneous case but we have to close the connection
// anyway since there will be connection leak if we don't do so
// the connection has been moved out of the pool
LOG.error("Active connection with {} handlers will be closed",
this.numThreads);
}
this.closed = true;
Object proxy = this.client.getProxy();
// Nobody should be using this anymore so it should close right away
RPC.stopProxy(proxy);
}
public synchronized void close() {
close(false);
}
@Override

View File

@ -281,6 +281,42 @@ public class ConnectionManager {
return total;
}
/**
* Get number of idle connections.
*
* @return Number of active connections.
*/
public int getNumIdleConnections() {
int total = 0;
readLock.lock();
try {
for (ConnectionPool pool : this.pools.values()) {
total += pool.getNumIdleConnections();
}
} finally {
readLock.unlock();
}
return total;
}
/**
* Get number of recently active connections.
*
* @return Number of recently active connections.
*/
public int getNumActiveConnectionsRecently() {
int total = 0;
readLock.lock();
try {
for (ConnectionPool pool : this.pools.values()) {
total += pool.getNumActiveConnectionsRecently();
}
} finally {
readLock.unlock();
}
return total;
}
/**
* Get the number of connections to be created.
*
@ -327,12 +363,21 @@ public class ConnectionManager {
// Check if the pool hasn't been active in a while or not 50% are used
long timeSinceLastActive = Time.now() - pool.getLastActiveTime();
int total = pool.getNumConnections();
int active = pool.getNumActiveConnections();
// Active is a transient status in many cases for a connection since
// the handler thread uses the connection very quickly. Thus the number
// of connections with handlers using at the call time is constantly low.
// Recently active is more lasting status and it shows how many
// connections have been used with a recent time period. (i.e. 30 seconds)
int active = pool.getNumActiveConnectionsRecently();
float poolMinActiveRatio = pool.getMinActiveRatio();
if (timeSinceLastActive > connectionCleanupPeriodMs ||
active < poolMinActiveRatio * total) {
// Remove and close 1 connection
List<ConnectionContext> conns = pool.removeConnections(1);
// Be greedy here to close as many connections as possible in one shot
// The number should at least be 1
int targetConnectionsCount = Math.max(1,
(int)(poolMinActiveRatio * total) - active);
List<ConnectionContext> conns =
pool.removeConnections(targetConnectionsCount);
for (ConnectionContext conn : conns) {
conn.close();
}
@ -414,7 +459,7 @@ public class ConnectionManager {
ConnectionPool pool = this.queue.take();
try {
int total = pool.getNumConnections();
int active = pool.getNumActiveConnections();
int active = pool.getNumActiveConnectionsRecently();
float poolMinActiveRatio = pool.getMinActiveRatio();
if (pool.getNumConnections() < pool.getMaxSize() &&
active >= poolMinActiveRatio * total) {

View File

@ -252,19 +252,23 @@ public class ConnectionPool {
*/
public synchronized List<ConnectionContext> removeConnections(int num) {
List<ConnectionContext> removed = new LinkedList<>();
// Remove and close the last connection
List<ConnectionContext> tmpConnections = new ArrayList<>();
for (int i=0; i<this.connections.size(); i++) {
ConnectionContext conn = this.connections.get(i);
if (i < this.minSize || i < this.connections.size() - num) {
tmpConnections.add(conn);
} else {
removed.add(conn);
if (this.connections.size() > this.minSize) {
int targetCount = Math.min(num, this.connections.size() - this.minSize);
// Remove and close targetCount of connections
List<ConnectionContext> tmpConnections = new ArrayList<>();
for (int i = 0; i < this.connections.size(); i++) {
ConnectionContext conn = this.connections.get(i);
// Only pick idle connections to close
if (removed.size() < targetCount && conn.isUsable()) {
removed.add(conn);
} else {
tmpConnections.add(conn);
}
}
this.connections = tmpConnections;
}
this.connections = tmpConnections;
LOG.debug("Expected to remove {} connection " +
"and actually removed {} connections", num, removed.size());
return removed;
}
@ -278,7 +282,7 @@ public class ConnectionPool {
this.connectionPoolId, timeSinceLastActive);
for (ConnectionContext connection : this.connections) {
connection.close();
connection.close(true);
}
this.connections.clear();
}
@ -309,6 +313,39 @@ public class ConnectionPool {
return ret;
}
/**
* Number of usable i.e. no active thread connections.
*
* @return Number of idle connections
*/
protected int getNumIdleConnections() {
int ret = 0;
List<ConnectionContext> tmpConnections = this.connections;
for (ConnectionContext conn : tmpConnections) {
if (conn.isUsable()) {
ret++;
}
}
return ret;
}
/**
* Number of active connections recently in the pool.
*
* @return Number of active connections recently.
*/
protected int getNumActiveConnectionsRecently() {
int ret = 0;
List<ConnectionContext> tmpConnections = this.connections;
for (ConnectionContext conn : tmpConnections) {
if (conn.isActiveRecently()) {
ret++;
}
}
return ret;
}
/**
* Get the last time the connection pool was used.
*
@ -331,12 +368,18 @@ public class ConnectionPool {
public String getJSON() {
final Map<String, String> info = new LinkedHashMap<>();
info.put("active", Integer.toString(getNumActiveConnections()));
info.put("recent_active",
Integer.toString(getNumActiveConnectionsRecently()));
info.put("idle", Integer.toString(getNumIdleConnections()));
info.put("total", Integer.toString(getNumConnections()));
if (LOG.isDebugEnabled()) {
List<ConnectionContext> tmpConnections = this.connections;
for (int i=0; i<tmpConnections.size(); i++) {
ConnectionContext connection = tmpConnections.get(i);
info.put(i + " active", Boolean.toString(connection.isActive()));
info.put(i + " recent_active",
Integer.toString(getNumActiveConnectionsRecently()));
info.put(i + " idle", Boolean.toString(connection.isUsable()));
info.put(i + " closed", Boolean.toString(connection.isClosed()));
}
}

View File

@ -260,6 +260,24 @@ public class RouterRpcClient {
return this.connectionManager.getNumActiveConnections();
}
/**
* Total number of idle sockets between the router and NNs.
*
* @return Number of namenode clients.
*/
public int getNumIdleConnections() {
return this.connectionManager.getNumIdleConnections();
}
/**
* Total number of active sockets between the router and NNs.
*
* @return Number of recently active namenode clients.
*/
public int getNumActiveConnectionsRecently() {
return this.connectionManager.getNumActiveConnectionsRecently();
}
/**
* Total number of open connection pools to a NN. Each connection pool.
* represents one user + one NN.

View File

@ -255,6 +255,9 @@ public class TestConnectionManager {
if (e.getKey().getUgi() == ugi) {
assertEquals(numOfConns, e.getValue().getNumConnections());
assertEquals(numOfActiveConns, e.getValue().getNumActiveConnections());
// idle + active = total connections
assertEquals(numOfConns - numOfActiveConns,
e.getValue().getNumIdleConnections());
connPoolFoundForUser = true;
}
}
@ -265,13 +268,19 @@ public class TestConnectionManager {
@Test
public void testConfigureConnectionActiveRatio() throws IOException {
final int totalConns = 10;
int activeConns = 7;
// test 1 conn below the threshold and these conns are closed
testConnectionCleanup(0.8f, 10, 7, 9);
// test 2 conn below the threshold and these conns are closed
testConnectionCleanup(0.8f, 10, 6, 8);
}
private void testConnectionCleanup(float ratio, int totalConns,
int activeConns, int leftConns) throws IOException {
Configuration tmpConf = new Configuration();
// Set dfs.federation.router.connection.min-active-ratio 0.8f
// Set dfs.federation.router.connection.min-active-ratio
tmpConf.setFloat(
RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_MIN_ACTIVE_RATIO, 0.8f);
RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_MIN_ACTIVE_RATIO, ratio);
ConnectionManager tmpConnManager = new ConnectionManager(tmpConf);
tmpConnManager.start();
@ -284,21 +293,20 @@ public class TestConnectionManager {
TEST_NN_ADDRESS, NamenodeProtocol.class);
ConnectionPool pool = poolMap.get(connectionPoolId);
// Test min active ratio is 0.8f
assertEquals(0.8f, pool.getMinActiveRatio(), 0.001f);
// Test min active ratio is as set value
assertEquals(ratio, pool.getMinActiveRatio(), 0.001f);
pool.getConnection().getClient();
// Test there is one active connection in pool
assertEquals(1, pool.getNumActiveConnections());
// Add other 6 active/9 total connections to pool
// Add other active-1 connections / totalConns-1 connections to pool
addConnectionsToPool(pool, totalConns - 1, activeConns - 1);
// There are 7 active connections.
// The active number is less than totalConns(10) * minActiveRatio(0.8f).
// There are activeConn connections.
// We can cleanup the pool
tmpConnManager.cleanup(pool);
assertEquals(totalConns - 1, pool.getNumConnections());
assertEquals(leftConns, pool.getNumConnections());
tmpConnManager.close();
}