HDFS-13637. RBF: Router fails when threadIndex (in ConnectionPool) wraps around Integer.MIN_VALUE. Contributed by CR Hota.

(cherry picked from commit e11d674049)
This commit is contained in:
Inigo Goiri 2018-06-01 16:59:04 -07:00
parent 0be93a7973
commit fdacc8a088
2 changed files with 24 additions and 1 deletions

View File

@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import javax.net.SocketFactory; import javax.net.SocketFactory;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -148,6 +149,14 @@ public class ConnectionPool {
return this.connectionPoolId; return this.connectionPoolId;
} }
/**
* Get the clientIndex used to calculate index for lookup.
*/
@VisibleForTesting
public AtomicInteger getClientIndex() {
return this.clientIndex;
}
/** /**
* Return the next connection round-robin. * Return the next connection round-robin.
* *
@ -161,7 +170,8 @@ public class ConnectionPool {
ConnectionContext conn = null; ConnectionContext conn = null;
List<ConnectionContext> tmpConnections = this.connections; List<ConnectionContext> tmpConnections = this.connections;
int size = tmpConnections.size(); int size = tmpConnections.size();
int threadIndex = this.clientIndex.getAndIncrement(); // Inc and mask off sign bit, lookup index should be non-negative int
int threadIndex = this.clientIndex.getAndIncrement() & 0x7FFFFFFF;
for (int i=0; i<size; i++) { for (int i=0; i<size; i++) {
int index = (threadIndex + i) % size; int index = (threadIndex + i) % size;
conn = tmpConnections.get(index); conn = tmpConnections.get(index);

View File

@ -32,6 +32,7 @@ import java.util.Map;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.junit.Assert.assertNotNull;
/** /**
* Test functionalities of {@link ConnectionManager}, which manages a pool * Test functionalities of {@link ConnectionManager}, which manages a pool
@ -149,6 +150,18 @@ public class TestConnectionManager {
assertTrue(cc.isActive()); assertTrue(cc.isActive());
} }
@Test
public void testValidClientIndex() throws Exception {
ConnectionPool pool = new ConnectionPool(
conf, TEST_NN_ADDRESS, TEST_USER1, 2, 2, ClientProtocol.class);
for(int i = -3; i <= 3; i++) {
pool.getClientIndex().set(i);
ConnectionContext conn = pool.getConnection();
assertNotNull(conn);
assertTrue(conn.isUsable());
}
}
@Test @Test
public void getGetConnectionNamenodeProtocol() throws Exception { public void getGetConnectionNamenodeProtocol() throws Exception {
Map<ConnectionPoolId, ConnectionPool> poolMap = connManager.getPools(); Map<ConnectionPoolId, ConnectionPool> poolMap = connManager.getPools();