HBASE-10737 HConnectionImplementation should stop RpcClient on close
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1577304 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
db3482c61e
commit
889ff7cbed
|
@ -23,7 +23,6 @@ import java.io.IOException;
|
||||||
import java.io.InterruptedIOException;
|
import java.io.InterruptedIOException;
|
||||||
import java.lang.reflect.Constructor;
|
import java.lang.reflect.Constructor;
|
||||||
import java.lang.reflect.UndeclaredThrowableException;
|
import java.lang.reflect.UndeclaredThrowableException;
|
||||||
import java.net.SocketException;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
@ -48,7 +47,6 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.Chore;
|
import org.apache.hadoop.hbase.Chore;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
|
@ -2441,6 +2439,9 @@ class ConnectionManager {
|
||||||
if (clusterStatusListener != null) {
|
if (clusterStatusListener != null) {
|
||||||
clusterStatusListener.close();
|
clusterStatusListener.close();
|
||||||
}
|
}
|
||||||
|
if (rpcClient != null) {
|
||||||
|
rpcClient.stop();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -2532,7 +2533,7 @@ class ConnectionManager {
|
||||||
@Override
|
@Override
|
||||||
public HTableDescriptor[] getHTableDescriptors(
|
public HTableDescriptor[] getHTableDescriptors(
|
||||||
List<String> names) throws IOException {
|
List<String> names) throws IOException {
|
||||||
List<TableName> tableNames = new ArrayList(names.size());
|
List<TableName> tableNames = new ArrayList<TableName>(names.size());
|
||||||
for(String name : names) {
|
for(String name : names) {
|
||||||
tableNames.add(TableName.valueOf(name));
|
tableNames.add(TableName.valueOf(name));
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,8 +26,6 @@ import static org.junit.Assert.fail;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.Comparator;
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -58,7 +56,6 @@ import org.apache.hadoop.hbase.TableNotEnabledException;
|
||||||
import org.apache.hadoop.hbase.TableNotFoundException;
|
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||||
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
||||||
import org.apache.hadoop.hbase.catalog.CatalogTracker;
|
import org.apache.hadoop.hbase.catalog.CatalogTracker;
|
||||||
import org.apache.hadoop.hbase.catalog.MetaReader;
|
|
||||||
import org.apache.hadoop.hbase.constraint.ConstraintException;
|
import org.apache.hadoop.hbase.constraint.ConstraintException;
|
||||||
import org.apache.hadoop.hbase.executor.EventHandler;
|
import org.apache.hadoop.hbase.executor.EventHandler;
|
||||||
import org.apache.hadoop.hbase.master.AssignmentManager;
|
import org.apache.hadoop.hbase.master.AssignmentManager;
|
||||||
|
|
|
@ -81,14 +81,7 @@ public class TestClientTimeouts {
|
||||||
HConnection lastConnection = null;
|
HConnection lastConnection = null;
|
||||||
boolean lastFailed = false;
|
boolean lastFailed = false;
|
||||||
int initialInvocations = RandomTimeoutBlockingRpcChannel.invokations.get();
|
int initialInvocations = RandomTimeoutBlockingRpcChannel.invokations.get();
|
||||||
RpcClient rpcClient = new RpcClient(TEST_UTIL.getConfiguration(), TEST_UTIL.getClusterKey()) {
|
RpcClient rpcClient = newRandomTimeoutRpcClient();
|
||||||
// Return my own instance, one that does random timeouts
|
|
||||||
@Override
|
|
||||||
public BlockingRpcChannel createBlockingRpcChannel(ServerName sn,
|
|
||||||
User ticket, int rpcTimeout) {
|
|
||||||
return new RandomTimeoutBlockingRpcChannel(this, sn, ticket, rpcTimeout);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
try {
|
try {
|
||||||
for (int i = 0; i < 5 || (lastFailed && i < 100); ++i) {
|
for (int i = 0; i < 5 || (lastFailed && i < 100); ++i) {
|
||||||
lastFailed = false;
|
lastFailed = false;
|
||||||
|
@ -102,8 +95,12 @@ public class TestClientTimeouts {
|
||||||
assertFalse(connection == lastConnection);
|
assertFalse(connection == lastConnection);
|
||||||
lastConnection = connection;
|
lastConnection = connection;
|
||||||
// Override the connection's rpc client for timeout testing
|
// Override the connection's rpc client for timeout testing
|
||||||
((ConnectionManager.HConnectionImplementation)connection).setRpcClient(
|
RpcClient oldRpcClient =
|
||||||
|
((ConnectionManager.HConnectionImplementation)connection).setRpcClient(
|
||||||
rpcClient);
|
rpcClient);
|
||||||
|
if (oldRpcClient != null) {
|
||||||
|
oldRpcClient.stop();
|
||||||
|
}
|
||||||
// run some admin commands
|
// run some admin commands
|
||||||
HBaseAdmin.checkHBaseAvailable(conf);
|
HBaseAdmin.checkHBaseAvailable(conf);
|
||||||
admin.setBalancerRunning(false, false);
|
admin.setBalancerRunning(false, false);
|
||||||
|
@ -113,6 +110,9 @@ public class TestClientTimeouts {
|
||||||
lastFailed = true;
|
lastFailed = true;
|
||||||
} finally {
|
} finally {
|
||||||
admin.close();
|
admin.close();
|
||||||
|
if (admin.getConnection().isClosed()) {
|
||||||
|
rpcClient = newRandomTimeoutRpcClient();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Ensure the RandomTimeoutRpcEngine is actually being used.
|
// Ensure the RandomTimeoutRpcEngine is actually being used.
|
||||||
|
@ -123,6 +123,18 @@ public class TestClientTimeouts {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static RpcClient newRandomTimeoutRpcClient() {
|
||||||
|
return new RpcClient(
|
||||||
|
TEST_UTIL.getConfiguration(), TEST_UTIL.getClusterKey()) {
|
||||||
|
// Return my own instance, one that does random timeouts
|
||||||
|
@Override
|
||||||
|
public BlockingRpcChannel createBlockingRpcChannel(ServerName sn,
|
||||||
|
User ticket, int rpcTimeout) {
|
||||||
|
return new RandomTimeoutBlockingRpcChannel(this, sn, ticket, rpcTimeout);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Blocking rpc channel that goes via hbase rpc.
|
* Blocking rpc channel that goes via hbase rpc.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -252,6 +252,7 @@ public class TestHCM {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
t.close();
|
||||||
hci.getClient(sn); // will throw an exception: RegionServerStoppedException
|
hci.getClient(sn); // will throw an exception: RegionServerStoppedException
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -300,6 +301,8 @@ public class TestHCM {
|
||||||
LOG.info("We received an exception, as expected ", e);
|
LOG.info("We received an exception, as expected ", e);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
Assert.fail("Wrong exception:" + e.getMessage());
|
Assert.fail("Wrong exception:" + e.getMessage());
|
||||||
|
} finally {
|
||||||
|
table.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -369,13 +372,14 @@ public class TestHCM {
|
||||||
step.compareAndSet(1, 2);
|
step.compareAndSet(1, 2);
|
||||||
// The test may fail here if the thread doing the gets is stuck. The way to find
|
// The test may fail here if the thread doing the gets is stuck. The way to find
|
||||||
// out what's happening is to look for the thread named 'testConnectionCloseThread'
|
// out what's happening is to look for the thread named 'testConnectionCloseThread'
|
||||||
TEST_UTIL.waitFor(20000, new Waiter.Predicate<Exception>() {
|
TEST_UTIL.waitFor(40000, new Waiter.Predicate<Exception>() {
|
||||||
@Override
|
@Override
|
||||||
public boolean evaluate() throws Exception {
|
public boolean evaluate() throws Exception {
|
||||||
return step.get() == 3;
|
return step.get() == 3;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
table.close();
|
||||||
Assert.assertTrue("Unexpected exception is " + failed.get(), failed.get() == null);
|
Assert.assertTrue("Unexpected exception is " + failed.get(), failed.get() == null);
|
||||||
TEST_UTIL.getHBaseAdmin().setBalancerRunning(previousBalance, true);
|
TEST_UTIL.getHBaseAdmin().setBalancerRunning(previousBalance, true);
|
||||||
}
|
}
|
||||||
|
@ -431,6 +435,7 @@ public class TestHCM {
|
||||||
|
|
||||||
LOG.info("we're done - time will change back");
|
LOG.info("we're done - time will change back");
|
||||||
|
|
||||||
|
table.close();
|
||||||
EnvironmentEdgeManager.reset();
|
EnvironmentEdgeManager.reset();
|
||||||
TEST_UTIL.getHBaseAdmin().setBalancerRunning(previousBalance, true);
|
TEST_UTIL.getHBaseAdmin().setBalancerRunning(previousBalance, true);
|
||||||
}
|
}
|
||||||
|
@ -736,7 +741,7 @@ public class TestHCM {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testConnectionManagement() throws Exception{
|
public void testConnectionManagement() throws Exception{
|
||||||
TEST_UTIL.createTable(TABLE_NAME1, FAM_NAM);
|
HTable table0 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAM);
|
||||||
HConnection conn = HConnectionManager.createConnection(TEST_UTIL.getConfiguration());
|
HConnection conn = HConnectionManager.createConnection(TEST_UTIL.getConfiguration());
|
||||||
HTableInterface table = conn.getTable(TABLE_NAME1.getName());
|
HTableInterface table = conn.getTable(TABLE_NAME1.getName());
|
||||||
table.close();
|
table.close();
|
||||||
|
@ -747,6 +752,7 @@ public class TestHCM {
|
||||||
assertFalse(((HTable)table).getPool().isShutdown());
|
assertFalse(((HTable)table).getPool().isShutdown());
|
||||||
conn.close();
|
conn.close();
|
||||||
assertTrue(((HTable)table).getPool().isShutdown());
|
assertTrue(((HTable)table).getPool().isShutdown());
|
||||||
|
table0.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -794,6 +800,7 @@ public class TestHCM {
|
||||||
ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() - 1);
|
ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() - 1);
|
||||||
location = conn.getCachedLocation(TABLE_NAME2, ROW);
|
location = conn.getCachedLocation(TABLE_NAME2, ROW);
|
||||||
Assert.assertEquals(nextPort - 1, location.getPort());
|
Assert.assertEquals(nextPort - 1, location.getPort());
|
||||||
|
table.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in New Issue