HDFS-13637. RBF: Router fails when threadIndex (in ConnectionPool) wraps around Integer.MIN_VALUE. Contributed by CR Hota.
This commit is contained in:
parent
4880d890ee
commit
e11d674049
|
@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import javax.net.SocketFactory;
|
import javax.net.SocketFactory;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -148,6 +149,14 @@ public class ConnectionPool {
|
||||||
return this.connectionPoolId;
|
return this.connectionPoolId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the clientIndex used to calculate index for lookup.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
public AtomicInteger getClientIndex() {
|
||||||
|
return this.clientIndex;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return the next connection round-robin.
|
* Return the next connection round-robin.
|
||||||
*
|
*
|
||||||
|
@ -161,7 +170,8 @@ public class ConnectionPool {
|
||||||
ConnectionContext conn = null;
|
ConnectionContext conn = null;
|
||||||
List<ConnectionContext> tmpConnections = this.connections;
|
List<ConnectionContext> tmpConnections = this.connections;
|
||||||
int size = tmpConnections.size();
|
int size = tmpConnections.size();
|
||||||
int threadIndex = this.clientIndex.getAndIncrement();
|
// Inc and mask off sign bit, lookup index should be non-negative int
|
||||||
|
int threadIndex = this.clientIndex.getAndIncrement() & 0x7FFFFFFF;
|
||||||
for (int i=0; i<size; i++) {
|
for (int i=0; i<size; i++) {
|
||||||
int index = (threadIndex + i) % size;
|
int index = (threadIndex + i) % size;
|
||||||
conn = tmpConnections.get(index);
|
conn = tmpConnections.get(index);
|
||||||
|
|
|
@ -32,6 +32,7 @@ import java.util.Map;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test functionalities of {@link ConnectionManager}, which manages a pool
|
* Test functionalities of {@link ConnectionManager}, which manages a pool
|
||||||
|
@ -149,6 +150,18 @@ public class TestConnectionManager {
|
||||||
assertTrue(cc.isActive());
|
assertTrue(cc.isActive());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testValidClientIndex() throws Exception {
|
||||||
|
ConnectionPool pool = new ConnectionPool(
|
||||||
|
conf, TEST_NN_ADDRESS, TEST_USER1, 2, 2, ClientProtocol.class);
|
||||||
|
for(int i = -3; i <= 3; i++) {
|
||||||
|
pool.getClientIndex().set(i);
|
||||||
|
ConnectionContext conn = pool.getConnection();
|
||||||
|
assertNotNull(conn);
|
||||||
|
assertTrue(conn.isUsable());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void getGetConnectionNamenodeProtocol() throws Exception {
|
public void getGetConnectionNamenodeProtocol() throws Exception {
|
||||||
Map<ConnectionPoolId, ConnectionPool> poolMap = connManager.getPools();
|
Map<ConnectionPoolId, ConnectionPool> poolMap = connManager.getPools();
|
||||||
|
|
Loading…
Reference in New Issue