HDFS-13637. RBF: Router fails when threadIndex (in ConnectionPool) wraps around Integer.MIN_VALUE. Contributed by CR Hota.
(cherry picked from commit e11d674049
)
This commit is contained in:
parent
f13e01fdfe
commit
d40c49257a
|
@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
|
||||
import javax.net.SocketFactory;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -148,6 +149,14 @@ public class ConnectionPool {
|
|||
return this.connectionPoolId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the clientIndex used to calculate index for lookup.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public AtomicInteger getClientIndex() {
|
||||
return this.clientIndex;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the next connection round-robin.
|
||||
*
|
||||
|
@ -161,7 +170,8 @@ public class ConnectionPool {
|
|||
ConnectionContext conn = null;
|
||||
List<ConnectionContext> tmpConnections = this.connections;
|
||||
int size = tmpConnections.size();
|
||||
int threadIndex = this.clientIndex.getAndIncrement();
|
||||
// Inc and mask off sign bit, lookup index should be non-negative int
|
||||
int threadIndex = this.clientIndex.getAndIncrement() & 0x7FFFFFFF;
|
||||
for (int i=0; i<size; i++) {
|
||||
int index = (threadIndex + i) % size;
|
||||
conn = tmpConnections.get(index);
|
||||
|
|
|
@ -32,6 +32,7 @@ import java.util.Map;
|
|||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
|
||||
/**
|
||||
* Test functionalities of {@link ConnectionManager}, which manages a pool
|
||||
|
@ -149,6 +150,18 @@ public class TestConnectionManager {
|
|||
assertTrue(cc.isActive());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidClientIndex() throws Exception {
|
||||
ConnectionPool pool = new ConnectionPool(
|
||||
conf, TEST_NN_ADDRESS, TEST_USER1, 2, 2, ClientProtocol.class);
|
||||
for(int i = -3; i <= 3; i++) {
|
||||
pool.getClientIndex().set(i);
|
||||
ConnectionContext conn = pool.getConnection();
|
||||
assertNotNull(conn);
|
||||
assertTrue(conn.isUsable());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getGetConnectionNamenodeProtocol() throws Exception {
|
||||
Map<ConnectionPoolId, ConnectionPool> poolMap = connManager.getPools();
|
||||
|
|
Loading…
Reference in New Issue