HDFS-13232. RBF: ConnectionPool should return first usable connection. Contributed by Ekanth S.

This commit is contained in:
Inigo Goiri 2018-03-09 18:25:05 -08:00
parent afe1a3ccd5
commit 8133cd5305
2 changed files with 44 additions and 1 deletions

View File

@ -159,7 +159,7 @@ public class ConnectionPool {
for (int i=0; i<size; i++) {
int index = (threadIndex + i) % size;
conn = tmpConnections.get(index);
if (conn != null && !conn.isUsable()) {
if (conn != null && conn.isUsable()) {
return conn;
}
}

View File

@ -28,6 +28,7 @@ import java.io.IOException;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* Test functionalities of {@link ConnectionManager}, which manages a pool
@ -41,6 +42,8 @@ public class TestConnectionManager {
UserGroupInformation.createUserForTesting("user1", TEST_GROUP);
private static final UserGroupInformation TEST_USER2 =
UserGroupInformation.createUserForTesting("user2", TEST_GROUP);
private static final UserGroupInformation TEST_USER3 =
UserGroupInformation.createUserForTesting("user3", TEST_GROUP);
private static final String TEST_NN_ADDRESS = "nn1:8080";
@Before
@ -87,6 +90,46 @@ public class TestConnectionManager {
connManager.cleanup(pool1);
checkPoolConnections(TEST_USER1, 8, 4);
checkPoolConnections(TEST_USER2, 10, 10);
// Make sure the number of connections doesn't go below minSize
ConnectionPool pool3 = new ConnectionPool(
conf, TEST_NN_ADDRESS, TEST_USER3, 2, 10);
addConnectionsToPool(pool3, 10, 0);
poolMap.put(new ConnectionPoolId(TEST_USER2, TEST_NN_ADDRESS), pool3);
connManager.cleanup(pool3);
checkPoolConnections(TEST_USER3, 2, 0);
// With active connections added to pool, make sure it honors the
// MIN_ACTIVE_RATIO again
addConnectionsToPool(pool3, 10, 2);
connManager.cleanup(pool3);
checkPoolConnections(TEST_USER3, 4, 2);
}
@Test
public void testGetConnection() throws Exception {
Map<ConnectionPoolId, ConnectionPool> poolMap = connManager.getPools();
final int totalConns = 10;
int activeConns = 5;
ConnectionPool pool = new ConnectionPool(
conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10);
addConnectionsToPool(pool, totalConns, activeConns);
poolMap.put(new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS), pool);
// All remaining connections should be usable
final int remainingSlots = totalConns - activeConns;
for (int i = 0; i < remainingSlots; i++) {
ConnectionContext cc = pool.getConnection();
assertTrue(cc.isUsable());
cc.getClient();
activeConns++;
}
checkPoolConnections(TEST_USER1, totalConns, activeConns);
// Ask for more and this returns an active connection
ConnectionContext cc = pool.getConnection();
assertTrue(cc.isActive());
}
private void addConnectionsToPool(ConnectionPool pool, int numTotalConn,