diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java index ebf15567e68..e94f69b3051 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java @@ -32,6 +32,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.security.UserGroupInformation; @@ -303,6 +304,38 @@ public class ConnectionManager { return JSON.toString(info); } + @VisibleForTesting + Map getPools() { + return this.pools; + } + + /** + * Clean the unused connections for this pool. + * + * @param pool Connection pool to cleanup. + */ + @VisibleForTesting + void cleanup(ConnectionPool pool) { + if (pool.getNumConnections() > pool.getMinSize()) { + // Check if the pool hasn't been active in a while or not 50% are used + long timeSinceLastActive = Time.now() - pool.getLastActiveTime(); + int total = pool.getNumConnections(); + int active = pool.getNumActiveConnections(); + if (timeSinceLastActive > connectionCleanupPeriodMs || + active < MIN_ACTIVE_RATIO * total) { + // Remove and close 1 connection + List conns = pool.removeConnections(1); + for (ConnectionContext conn : conns) { + conn.close(); + } + LOG.debug("Removed connection {} used {} seconds ago. " + + "Pool has {}/{} connections", pool.getConnectionPoolId(), + TimeUnit.MILLISECONDS.toSeconds(timeSinceLastActive), + pool.getNumConnections(), pool.getMaxSize()); + } + } + } + /** * Removes stale connections not accessed recently from the pool. This is * invoked periodically. @@ -350,32 +383,6 @@ public class ConnectionManager { } } } - - /** - * Clean the unused connections for this pool. - * - * @param pool Connection pool to cleanup. - */ - private void cleanup(ConnectionPool pool) { - if (pool.getNumConnections() > pool.getMinSize()) { - // Check if the pool hasn't been active in a while or not 50% are used - long timeSinceLastActive = Time.now() - pool.getLastActiveTime(); - int total = pool.getNumConnections(); - int active = getNumActiveConnections(); - if (timeSinceLastActive > connectionCleanupPeriodMs || - active < MIN_ACTIVE_RATIO * total) { - // Remove and close 1 connection - List conns = pool.removeConnections(1); - for (ConnectionContext conn : conns) { - conn.close(); - } - LOG.debug("Removed connection {} used {} seconds ago. " + - "Pool has {}/{} connections", pool.getConnectionPoolId(), - TimeUnit.MILLISECONDS.toSeconds(timeSinceLastActive), - pool.getNumConnections(), pool.getMaxSize()); - } - } - } } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPoolId.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPoolId.java index a3a78deb79a..6e1ee9a5c4d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPoolId.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPoolId.java @@ -23,6 +23,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -99,6 +100,11 @@ public class ConnectionPoolId implements Comparable { return ret; } + @VisibleForTesting + UserGroupInformation getUgi() { + return this.ugi; + } + /** * Get the token identifiers for this connection. * @return List with the token identifiers. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java new file mode 100644 index 00000000000..fe9f1955ed1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +/** + * Test functionalities of {@link ConnectionManager}, which manages a pool + * of connections to NameNodes. + */ +public class TestConnectionManager { + private Configuration conf; + private ConnectionManager connManager; + private static final String[] TEST_GROUP = new String[]{"TEST_GROUP"}; + private static final UserGroupInformation TEST_USER1 = + UserGroupInformation.createUserForTesting("user1", TEST_GROUP); + private static final UserGroupInformation TEST_USER2 = + UserGroupInformation.createUserForTesting("user2", TEST_GROUP); + private static final String TEST_NN_ADDRESS = "nn1:8080"; + + @Before + public void setup() throws Exception { + conf = new Configuration(); + connManager = new ConnectionManager(conf); + NetUtils.addStaticResolution("nn1", "localhost"); + NetUtils.createSocketAddrForHost("nn1", 8080); + connManager.start(); + } + + @After + public void shutdown() { + if (connManager != null) { + connManager.close(); + } + } + + @Test + public void testCleanup() throws Exception { + Map poolMap = connManager.getPools(); + + ConnectionPool pool1 = new ConnectionPool( + conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10); + addConnectionsToPool(pool1, 9, 4); + poolMap.put(new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS), pool1); + + ConnectionPool pool2 = new ConnectionPool( + conf, TEST_NN_ADDRESS, TEST_USER2, 0, 10); + addConnectionsToPool(pool2, 10, 10); + poolMap.put(new ConnectionPoolId(TEST_USER2, TEST_NN_ADDRESS), pool2); + + checkPoolConnections(TEST_USER1, 9, 4); + checkPoolConnections(TEST_USER2, 10, 10); + + // Clean up first pool, one connection should be removed, and second pool + // should remain the same. + connManager.cleanup(pool1); + checkPoolConnections(TEST_USER1, 8, 4); + checkPoolConnections(TEST_USER2, 10, 10); + + // Clean up the first pool again, it should have no effect since it reached + // the MIN_ACTIVE_RATIO. + connManager.cleanup(pool1); + checkPoolConnections(TEST_USER1, 8, 4); + checkPoolConnections(TEST_USER2, 10, 10); + } + + private void addConnectionsToPool(ConnectionPool pool, int numTotalConn, + int numActiveConn) throws IOException { + for (int i = 0; i < numTotalConn; i++) { + ConnectionContext cc = pool.newConnection(); + pool.addConnection(cc); + if (i < numActiveConn) { + cc.getClient(); + } + } + } + + private void checkPoolConnections(UserGroupInformation ugi, + int numOfConns, int numOfActiveConns) { + for (Map.Entry e : + connManager.getPools().entrySet()) { + if (e.getKey().getUgi() == ugi) { + assertEquals(numOfConns, e.getValue().getNumConnections()); + assertEquals(numOfActiveConns, e.getValue().getNumActiveConnections()); + } + } + } + +}