HBASE-13248 Make HConnectionImplementation top-level class
This commit is contained in:
parent
e78aeb24e0
commit
8579c6dd49
|
@ -221,7 +221,7 @@ public class ConnectionFactory {
|
||||||
final ExecutorService pool, final User user)
|
final ExecutorService pool, final User user)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
String className = conf.get(HConnection.HBASE_CLIENT_CONNECTION_IMPL,
|
String className = conf.get(HConnection.HBASE_CLIENT_CONNECTION_IMPL,
|
||||||
ConnectionManager.HConnectionImplementation.class.getName());
|
ConnectionImplementation.class.getName());
|
||||||
Class<?> clazz = null;
|
Class<?> clazz = null;
|
||||||
try {
|
try {
|
||||||
clazz = Class.forName(className);
|
clazz = Class.forName(className);
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -82,7 +82,7 @@ public final class ConnectionUtils {
|
||||||
*/
|
*/
|
||||||
public static NonceGenerator injectNonceGeneratorForTesting(
|
public static NonceGenerator injectNonceGeneratorForTesting(
|
||||||
ClusterConnection conn, NonceGenerator cnm) {
|
ClusterConnection conn, NonceGenerator cnm) {
|
||||||
return ConnectionManager.injectNonceGeneratorForTesting(conn, cnm);
|
return ConnectionImplementation.injectNonceGeneratorForTesting(conn, cnm);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -146,7 +146,7 @@ public final class ConnectionUtils {
|
||||||
* Some tests shut down the master. But table availability is a master RPC which is performed on
|
* Some tests shut down the master. But table availability is a master RPC which is performed on
|
||||||
* region re-lookups.
|
* region re-lookups.
|
||||||
*/
|
*/
|
||||||
static class MasterlessConnection extends ConnectionManager.HConnectionImplementation {
|
static class MasterlessConnection extends ConnectionImplementation {
|
||||||
MasterlessConnection(Configuration conf, boolean managed,
|
MasterlessConnection(Configuration conf, boolean managed,
|
||||||
ExecutorService pool, User user) throws IOException {
|
ExecutorService pool, User user) throws IOException {
|
||||||
super(conf, managed, pool, user);
|
super(conf, managed, pool, user);
|
||||||
|
|
|
@ -2561,7 +2561,7 @@ public class HBaseAdmin implements Admin {
|
||||||
ZooKeeperKeepAliveConnection zkw = null;
|
ZooKeeperKeepAliveConnection zkw = null;
|
||||||
try {
|
try {
|
||||||
// This is NASTY. FIX!!!! Dependent on internal implementation! TODO
|
// This is NASTY. FIX!!!! Dependent on internal implementation! TODO
|
||||||
zkw = ((ConnectionManager.HConnectionImplementation)connection).
|
zkw = ((ConnectionImplementation)connection).
|
||||||
getKeepAliveZooKeeperWatcher();
|
getKeepAliveZooKeeperWatcher();
|
||||||
zkw.getRecoverableZooKeeper().getZooKeeper().exists(zkw.baseZNode, false);
|
zkw.getRecoverableZooKeeper().getZooKeeper().exists(zkw.baseZNode, false);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
@ -3781,8 +3781,8 @@ public class HBaseAdmin implements Admin {
|
||||||
@Override
|
@Override
|
||||||
public int getMasterInfoPort() throws IOException {
|
public int getMasterInfoPort() throws IOException {
|
||||||
// TODO: Fix! Reaching into internal implementation!!!!
|
// TODO: Fix! Reaching into internal implementation!!!!
|
||||||
ConnectionManager.HConnectionImplementation connection =
|
ConnectionImplementation connection =
|
||||||
(ConnectionManager.HConnectionImplementation)this.connection;
|
(ConnectionImplementation)this.connection;
|
||||||
ZooKeeperKeepAliveConnection zkw = connection.getKeepAliveZooKeeperWatcher();
|
ZooKeeperKeepAliveConnection zkw = connection.getKeepAliveZooKeeperWatcher();
|
||||||
try {
|
try {
|
||||||
return MasterAddressTracker.getMasterInfoPort(zkw);
|
return MasterAddressTracker.getMasterInfoPort(zkw);
|
||||||
|
|
|
@ -27,7 +27,7 @@ import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Factory implementation to provide the {@link HConnectionImplementation} with
|
* Factory implementation to provide the {@link ConnectionImplementation} with
|
||||||
* the implementation of the {@link RetryingCallerInterceptor} that we would use
|
* the implementation of the {@link RetryingCallerInterceptor} that we would use
|
||||||
* to intercept the {@link RpcRetryingCaller} during the course of their calls.
|
* to intercept the {@link RpcRetryingCaller} during the course of their calls.
|
||||||
*
|
*
|
||||||
|
|
|
@ -39,14 +39,14 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||||
class ZooKeeperKeepAliveConnection extends ZooKeeperWatcher{
|
class ZooKeeperKeepAliveConnection extends ZooKeeperWatcher{
|
||||||
ZooKeeperKeepAliveConnection(
|
ZooKeeperKeepAliveConnection(
|
||||||
Configuration conf, String descriptor,
|
Configuration conf, String descriptor,
|
||||||
ConnectionManager.HConnectionImplementation conn) throws IOException {
|
ConnectionImplementation conn) throws IOException {
|
||||||
super(conf, descriptor, conn);
|
super(conf, descriptor, conn);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
if (this.abortable != null) {
|
if (this.abortable != null) {
|
||||||
((ConnectionManager.HConnectionImplementation)abortable).releaseZooKeeperWatcher(this);
|
((ConnectionImplementation)abortable).releaseZooKeeperWatcher(this);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -37,14 +37,14 @@ import org.apache.zookeeper.KeeperException;
|
||||||
class ZooKeeperRegistry implements Registry {
|
class ZooKeeperRegistry implements Registry {
|
||||||
static final Log LOG = LogFactory.getLog(ZooKeeperRegistry.class);
|
static final Log LOG = LogFactory.getLog(ZooKeeperRegistry.class);
|
||||||
// Needs an instance of hci to function. Set after construct this instance.
|
// Needs an instance of hci to function. Set after construct this instance.
|
||||||
ConnectionManager.HConnectionImplementation hci;
|
ConnectionImplementation hci;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init(Connection connection) {
|
public void init(Connection connection) {
|
||||||
if (!(connection instanceof ConnectionManager.HConnectionImplementation)) {
|
if (!(connection instanceof ConnectionImplementation)) {
|
||||||
throw new RuntimeException("This registry depends on HConnectionImplementation");
|
throw new RuntimeException("This registry depends on ConnectionImplementation");
|
||||||
}
|
}
|
||||||
this.hci = (ConnectionManager.HConnectionImplementation)connection;
|
this.hci = (ConnectionImplementation)connection;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -338,7 +338,7 @@ public class TestAsyncProcess {
|
||||||
/**
|
/**
|
||||||
* Returns our async process.
|
* Returns our async process.
|
||||||
*/
|
*/
|
||||||
static class MyConnectionImpl extends ConnectionManager.HConnectionImplementation {
|
static class MyConnectionImpl extends ConnectionImplementation {
|
||||||
final AtomicInteger nbThreads = new AtomicInteger(0);
|
final AtomicInteger nbThreads = new AtomicInteger(0);
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -103,7 +103,7 @@ public class TestClientNoCluster extends Configured implements Tool {
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
this.conf = HBaseConfiguration.create();
|
this.conf = HBaseConfiguration.create();
|
||||||
// Run my HConnection overrides. Use my little HConnectionImplementation below which
|
// Run my HConnection overrides. Use my little ConnectionImplementation below which
|
||||||
// allows me insert mocks and also use my Registry below rather than the default zk based
|
// allows me insert mocks and also use my Registry below rather than the default zk based
|
||||||
// one so tests run faster and don't have zk dependency.
|
// one so tests run faster and don't have zk dependency.
|
||||||
this.conf.set("hbase.client.registry.impl", SimpleRegistry.class.getName());
|
this.conf.set("hbase.client.registry.impl", SimpleRegistry.class.getName());
|
||||||
|
@ -262,7 +262,7 @@ public class TestClientNoCluster extends Configured implements Tool {
|
||||||
* Override to shutdown going to zookeeper for cluster id and meta location.
|
* Override to shutdown going to zookeeper for cluster id and meta location.
|
||||||
*/
|
*/
|
||||||
static class ScanOpenNextThenExceptionThenRecoverConnection
|
static class ScanOpenNextThenExceptionThenRecoverConnection
|
||||||
extends ConnectionManager.HConnectionImplementation {
|
extends ConnectionImplementation {
|
||||||
final ClientService.BlockingInterface stub;
|
final ClientService.BlockingInterface stub;
|
||||||
|
|
||||||
ScanOpenNextThenExceptionThenRecoverConnection(Configuration conf,
|
ScanOpenNextThenExceptionThenRecoverConnection(Configuration conf,
|
||||||
|
@ -294,7 +294,7 @@ public class TestClientNoCluster extends Configured implements Tool {
|
||||||
* Override to shutdown going to zookeeper for cluster id and meta location.
|
* Override to shutdown going to zookeeper for cluster id and meta location.
|
||||||
*/
|
*/
|
||||||
static class RegionServerStoppedOnScannerOpenConnection
|
static class RegionServerStoppedOnScannerOpenConnection
|
||||||
extends ConnectionManager.HConnectionImplementation {
|
extends ConnectionImplementation {
|
||||||
final ClientService.BlockingInterface stub;
|
final ClientService.BlockingInterface stub;
|
||||||
|
|
||||||
RegionServerStoppedOnScannerOpenConnection(Configuration conf, boolean managed,
|
RegionServerStoppedOnScannerOpenConnection(Configuration conf, boolean managed,
|
||||||
|
@ -326,7 +326,7 @@ public class TestClientNoCluster extends Configured implements Tool {
|
||||||
* Override to check we are setting rpc timeout right.
|
* Override to check we are setting rpc timeout right.
|
||||||
*/
|
*/
|
||||||
static class RpcTimeoutConnection
|
static class RpcTimeoutConnection
|
||||||
extends ConnectionManager.HConnectionImplementation {
|
extends ConnectionImplementation {
|
||||||
final ClientService.BlockingInterface stub;
|
final ClientService.BlockingInterface stub;
|
||||||
|
|
||||||
RpcTimeoutConnection(Configuration conf, boolean managed, ExecutorService pool, User user)
|
RpcTimeoutConnection(Configuration conf, boolean managed, ExecutorService pool, User user)
|
||||||
|
@ -353,7 +353,7 @@ public class TestClientNoCluster extends Configured implements Tool {
|
||||||
* Fake many regionservers and many regions on a connection implementation.
|
* Fake many regionservers and many regions on a connection implementation.
|
||||||
*/
|
*/
|
||||||
static class ManyServersManyRegionsConnection
|
static class ManyServersManyRegionsConnection
|
||||||
extends ConnectionManager.HConnectionImplementation {
|
extends ConnectionImplementation {
|
||||||
// All access should be synchronized
|
// All access should be synchronized
|
||||||
final Map<ServerName, ClientService.BlockingInterface> serversByClient;
|
final Map<ServerName, ClientService.BlockingInterface> serversByClient;
|
||||||
|
|
||||||
|
|
|
@ -72,8 +72,8 @@ public class TestSnapshotFromAdmin {
|
||||||
+ "- further testing won't prove anything.", time < ignoreExpectedTime);
|
+ "- further testing won't prove anything.", time < ignoreExpectedTime);
|
||||||
|
|
||||||
// setup the mocks
|
// setup the mocks
|
||||||
ConnectionManager.HConnectionImplementation mockConnection = Mockito
|
ConnectionImplementation mockConnection = Mockito
|
||||||
.mock(ConnectionManager.HConnectionImplementation.class);
|
.mock(ConnectionImplementation.class);
|
||||||
Configuration conf = HBaseConfiguration.create();
|
Configuration conf = HBaseConfiguration.create();
|
||||||
// setup the conf to match the expected properties
|
// setup the conf to match the expected properties
|
||||||
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, numRetries);
|
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, numRetries);
|
||||||
|
@ -119,8 +119,8 @@ public class TestSnapshotFromAdmin {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testValidateSnapshotName() throws Exception {
|
public void testValidateSnapshotName() throws Exception {
|
||||||
ConnectionManager.HConnectionImplementation mockConnection = Mockito
|
ConnectionImplementation mockConnection = Mockito
|
||||||
.mock(ConnectionManager.HConnectionImplementation.class);
|
.mock(ConnectionImplementation.class);
|
||||||
Configuration conf = HBaseConfiguration.create();
|
Configuration conf = HBaseConfiguration.create();
|
||||||
Mockito.when(mockConnection.getConfiguration()).thenReturn(conf);
|
Mockito.when(mockConnection.getConfiguration()).thenReturn(conf);
|
||||||
Admin admin = new HBaseAdmin(mockConnection);
|
Admin admin = new HBaseAdmin(mockConnection);
|
||||||
|
|
|
@ -63,4 +63,4 @@ log4j.logger.org.apache.hadoop.hbase=DEBUG
|
||||||
log4j.org.apache.hadoop.metrics2.impl.MetricsSystemImpl=ERROR
|
log4j.org.apache.hadoop.metrics2.impl.MetricsSystemImpl=ERROR
|
||||||
log4j.org.apache.hadoop.metrics2.util.MBeans=ERROR
|
log4j.org.apache.hadoop.metrics2.util.MBeans=ERROR
|
||||||
# Enable this to get detailed connection error/retry logging.
|
# Enable this to get detailed connection error/retry logging.
|
||||||
# log4j.logger.org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation=TRACE
|
# log4j.logger.org.apache.hadoop.hbase.client.ConnectionImplementation=TRACE
|
||||||
|
|
|
@ -63,4 +63,4 @@ log4j.logger.org.apache.hadoop.hbase=DEBUG
|
||||||
log4j.org.apache.hadoop.metrics2.impl.MetricsSystemImpl=ERROR
|
log4j.org.apache.hadoop.metrics2.impl.MetricsSystemImpl=ERROR
|
||||||
log4j.org.apache.hadoop.metrics2.util.MBeans=ERROR
|
log4j.org.apache.hadoop.metrics2.util.MBeans=ERROR
|
||||||
# Enable this to get detailed connection error/retry logging.
|
# Enable this to get detailed connection error/retry logging.
|
||||||
# log4j.logger.org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation=TRACE
|
# log4j.logger.org.apache.hadoop.hbase.client.ConnectionImplementation=TRACE
|
||||||
|
|
|
@ -24,7 +24,6 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.CoprocessorEnvironment;
|
import org.apache.hadoop.hbase.CoprocessorEnvironment;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.client.ConnectionManager.HConnectionImplementation;
|
|
||||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||||
|
@ -40,7 +39,7 @@ import org.apache.hadoop.hbase.security.UserProvider;
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@InterfaceStability.Evolving
|
@InterfaceStability.Evolving
|
||||||
public class CoprocessorHConnection extends HConnectionImplementation {
|
public class CoprocessorHConnection extends ConnectionImplementation {
|
||||||
private static final NonceGenerator NO_NONCE_GEN = new ConnectionManager.NoNonceGenerator();
|
private static final NonceGenerator NO_NONCE_GEN = new ConnectionManager.NoNonceGenerator();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -130,11 +130,11 @@ public class MultiHConnection {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// Copied from HConnectionImplementation.getBatchPool()
|
// Copied from ConnectionImplementation.getBatchPool()
|
||||||
// We should get rid of this when HConnection.processBatchCallback is un-deprecated and provides
|
// We should get rid of this when HConnection.processBatchCallback is un-deprecated and provides
|
||||||
// an API to manage a batch pool
|
// an API to manage a batch pool
|
||||||
private void createBatchPool(Configuration conf) {
|
private void createBatchPool(Configuration conf) {
|
||||||
// Use the same config for keep alive as in HConnectionImplementation.getBatchPool();
|
// Use the same config for keep alive as in ConnectionImplementation.getBatchPool();
|
||||||
int maxThreads = conf.getInt("hbase.multihconnection.threads.max", 256);
|
int maxThreads = conf.getInt("hbase.multihconnection.threads.max", 256);
|
||||||
int coreThreads = conf.getInt("hbase.multihconnection.threads.core", 256);
|
int coreThreads = conf.getInt("hbase.multihconnection.threads.core", 256);
|
||||||
if (maxThreads == 0) {
|
if (maxThreads == 0) {
|
||||||
|
|
|
@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
|
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
|
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
|
||||||
import org.apache.hadoop.hbase.client.ConnectionManager.HConnectionImplementation;
|
|
||||||
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
|
@ -55,10 +54,10 @@ public class HConnectionTestingUtility {
|
||||||
throws ZooKeeperConnectionException {
|
throws ZooKeeperConnectionException {
|
||||||
HConnectionKey connectionKey = new HConnectionKey(conf);
|
HConnectionKey connectionKey = new HConnectionKey(conf);
|
||||||
synchronized (ConnectionManager.CONNECTION_INSTANCES) {
|
synchronized (ConnectionManager.CONNECTION_INSTANCES) {
|
||||||
HConnectionImplementation connection =
|
ConnectionImplementation connection =
|
||||||
ConnectionManager.CONNECTION_INSTANCES.get(connectionKey);
|
ConnectionManager.CONNECTION_INSTANCES.get(connectionKey);
|
||||||
if (connection == null) {
|
if (connection == null) {
|
||||||
connection = Mockito.mock(HConnectionImplementation.class);
|
connection = Mockito.mock(ConnectionImplementation.class);
|
||||||
Mockito.when(connection.getConfiguration()).thenReturn(conf);
|
Mockito.when(connection.getConfiguration()).thenReturn(conf);
|
||||||
ConnectionManager.CONNECTION_INSTANCES.put(connectionKey, connection);
|
ConnectionManager.CONNECTION_INSTANCES.put(connectionKey, connection);
|
||||||
}
|
}
|
||||||
|
@ -98,7 +97,7 @@ public class HConnectionTestingUtility {
|
||||||
final ClientProtos.ClientService.BlockingInterface client,
|
final ClientProtos.ClientService.BlockingInterface client,
|
||||||
final ServerName sn, final HRegionInfo hri)
|
final ServerName sn, final HRegionInfo hri)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
HConnectionImplementation c = Mockito.mock(HConnectionImplementation.class);
|
ConnectionImplementation c = Mockito.mock(ConnectionImplementation.class);
|
||||||
Mockito.when(c.getConfiguration()).thenReturn(conf);
|
Mockito.when(c.getConfiguration()).thenReturn(conf);
|
||||||
ConnectionManager.CONNECTION_INSTANCES.put(new HConnectionKey(conf), c);
|
ConnectionManager.CONNECTION_INSTANCES.put(new HConnectionKey(conf), c);
|
||||||
Mockito.doNothing().when(c).close();
|
Mockito.doNothing().when(c).close();
|
||||||
|
@ -154,10 +153,10 @@ public class HConnectionTestingUtility {
|
||||||
throws IOException {
|
throws IOException {
|
||||||
HConnectionKey connectionKey = new HConnectionKey(conf);
|
HConnectionKey connectionKey = new HConnectionKey(conf);
|
||||||
synchronized (ConnectionManager.CONNECTION_INSTANCES) {
|
synchronized (ConnectionManager.CONNECTION_INSTANCES) {
|
||||||
HConnectionImplementation connection =
|
ConnectionImplementation connection =
|
||||||
ConnectionManager.CONNECTION_INSTANCES.get(connectionKey);
|
ConnectionManager.CONNECTION_INSTANCES.get(connectionKey);
|
||||||
if (connection == null) {
|
if (connection == null) {
|
||||||
connection = Mockito.spy(new HConnectionImplementation(conf, false));
|
connection = Mockito.spy(new ConnectionImplementation(conf, false));
|
||||||
ConnectionManager.CONNECTION_INSTANCES.put(connectionKey, connection);
|
ConnectionManager.CONNECTION_INSTANCES.put(connectionKey, connection);
|
||||||
}
|
}
|
||||||
return connection;
|
return connection;
|
||||||
|
@ -168,10 +167,10 @@ public class HConnectionTestingUtility {
|
||||||
throws IOException {
|
throws IOException {
|
||||||
HConnectionKey connectionKey = new HConnectionKey(conf);
|
HConnectionKey connectionKey = new HConnectionKey(conf);
|
||||||
synchronized (ConnectionManager.CONNECTION_INSTANCES) {
|
synchronized (ConnectionManager.CONNECTION_INSTANCES) {
|
||||||
HConnectionImplementation connection =
|
ConnectionImplementation connection =
|
||||||
ConnectionManager.CONNECTION_INSTANCES.get(connectionKey);
|
ConnectionManager.CONNECTION_INSTANCES.get(connectionKey);
|
||||||
if (connection == null) {
|
if (connection == null) {
|
||||||
connection = Mockito.spy(new HConnectionImplementation(conf, false));
|
connection = Mockito.spy(new ConnectionImplementation(conf, false));
|
||||||
ConnectionManager.CONNECTION_INSTANCES.put(connectionKey, connection);
|
ConnectionManager.CONNECTION_INSTANCES.put(connectionKey, connection);
|
||||||
}
|
}
|
||||||
return connection;
|
return connection;
|
||||||
|
|
|
@ -29,7 +29,6 @@ import java.io.IOException;
|
||||||
import java.lang.reflect.Field;
|
import java.lang.reflect.Field;
|
||||||
import java.lang.reflect.Modifier;
|
import java.lang.reflect.Modifier;
|
||||||
import java.net.SocketTimeoutException;
|
import java.net.SocketTimeoutException;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -56,7 +55,6 @@ import org.apache.hadoop.hbase.RegionLocations;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.Waiter;
|
import org.apache.hadoop.hbase.Waiter;
|
||||||
import org.apache.hadoop.hbase.client.ConnectionManager.HConnectionImplementation;
|
|
||||||
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
|
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
|
||||||
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
||||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
||||||
|
@ -158,13 +156,13 @@ public class TestHCM {
|
||||||
HConnection con1 = ConnectionManager.createConnection(TEST_UTIL.getConfiguration());
|
HConnection con1 = ConnectionManager.createConnection(TEST_UTIL.getConfiguration());
|
||||||
HConnection con2 = ConnectionManager.createConnection(TEST_UTIL.getConfiguration(), otherPool);
|
HConnection con2 = ConnectionManager.createConnection(TEST_UTIL.getConfiguration(), otherPool);
|
||||||
// make sure the internally created ExecutorService is the one passed
|
// make sure the internally created ExecutorService is the one passed
|
||||||
assertTrue(otherPool == ((HConnectionImplementation)con2).getCurrentBatchPool());
|
assertTrue(otherPool == ((ConnectionImplementation)con2).getCurrentBatchPool());
|
||||||
|
|
||||||
String tableName = "testClusterConnection";
|
String tableName = "testClusterConnection";
|
||||||
TEST_UTIL.createTable(tableName.getBytes(), FAM_NAM).close();
|
TEST_UTIL.createTable(tableName.getBytes(), FAM_NAM).close();
|
||||||
HTable t = (HTable)con1.getTable(tableName, otherPool);
|
HTable t = (HTable)con1.getTable(tableName, otherPool);
|
||||||
// make sure passing a pool to the getTable does not trigger creation of an internal pool
|
// make sure passing a pool to the getTable does not trigger creation of an internal pool
|
||||||
assertNull("Internal Thread pool should be null", ((HConnectionImplementation)con1).getCurrentBatchPool());
|
assertNull("Internal Thread pool should be null", ((ConnectionImplementation)con1).getCurrentBatchPool());
|
||||||
// table should use the pool passed
|
// table should use the pool passed
|
||||||
assertTrue(otherPool == t.getPool());
|
assertTrue(otherPool == t.getPool());
|
||||||
t.close();
|
t.close();
|
||||||
|
@ -185,7 +183,7 @@ public class TestHCM {
|
||||||
t.close();
|
t.close();
|
||||||
|
|
||||||
t = (HTable)con1.getTable(tableName);
|
t = (HTable)con1.getTable(tableName);
|
||||||
ExecutorService pool = ((HConnectionImplementation)con1).getCurrentBatchPool();
|
ExecutorService pool = ((ConnectionImplementation)con1).getCurrentBatchPool();
|
||||||
// make sure an internal pool was created
|
// make sure an internal pool was created
|
||||||
assertNotNull("An internal Thread pool should have been created", pool);
|
assertNotNull("An internal Thread pool should have been created", pool);
|
||||||
// and that the table is using it
|
// and that the table is using it
|
||||||
|
@ -244,7 +242,7 @@ public class TestHCM {
|
||||||
getRegionStates().isRegionsInTransition()){
|
getRegionStates().isRegionsInTransition()){
|
||||||
Thread.sleep(1);
|
Thread.sleep(1);
|
||||||
}
|
}
|
||||||
final HConnectionImplementation hci = (HConnectionImplementation)t.getConnection();
|
final ConnectionImplementation hci = (ConnectionImplementation)t.getConnection();
|
||||||
while (t.getRegionLocation(rk).getPort() != sn.getPort()){
|
while (t.getRegionLocation(rk).getPort() != sn.getPort()){
|
||||||
TEST_UTIL.getHBaseAdmin().move(t.getRegionLocation(rk).getRegionInfo().
|
TEST_UTIL.getHBaseAdmin().move(t.getRegionLocation(rk).getRegionInfo().
|
||||||
getEncodedNameAsBytes(), Bytes.toBytes(sn.toString()));
|
getEncodedNameAsBytes(), Bytes.toBytes(sn.toString()));
|
||||||
|
@ -388,8 +386,8 @@ public class TestHCM {
|
||||||
});
|
});
|
||||||
|
|
||||||
ServerName sn = table.getRegionLocation(ROW).getServerName();
|
ServerName sn = table.getRegionLocation(ROW).getServerName();
|
||||||
ConnectionManager.HConnectionImplementation conn =
|
ConnectionImplementation conn =
|
||||||
(ConnectionManager.HConnectionImplementation) table.getConnection();
|
(ConnectionImplementation) table.getConnection();
|
||||||
RpcClient rpcClient = conn.getRpcClient();
|
RpcClient rpcClient = conn.getRpcClient();
|
||||||
|
|
||||||
LOG.info("Going to cancel connections. connection=" + conn.toString() + ", sn=" + sn);
|
LOG.info("Going to cancel connections. connection=" + conn.toString() + ", sn=" + sn);
|
||||||
|
@ -502,7 +500,7 @@ public class TestHCM {
|
||||||
p.add(FAM_NAM, FAM_NAM, FAM_NAM);
|
p.add(FAM_NAM, FAM_NAM, FAM_NAM);
|
||||||
table.put(p);
|
table.put(p);
|
||||||
|
|
||||||
final HConnectionImplementation hci = (HConnectionImplementation)table.getConnection();
|
final ConnectionImplementation hci = (ConnectionImplementation)table.getConnection();
|
||||||
final HRegionLocation loc = table.getRegionLocation(FAM_NAM);
|
final HRegionLocation loc = table.getRegionLocation(FAM_NAM);
|
||||||
|
|
||||||
Get get = new Get(FAM_NAM);
|
Get get = new Get(FAM_NAM);
|
||||||
|
@ -573,8 +571,8 @@ public class TestHCM {
|
||||||
@Test
|
@Test
|
||||||
public void abortingHConnectionRemovesItselfFromHCM() throws Exception {
|
public void abortingHConnectionRemovesItselfFromHCM() throws Exception {
|
||||||
// Save off current HConnections
|
// Save off current HConnections
|
||||||
Map<HConnectionKey, HConnectionImplementation> oldHBaseInstances =
|
Map<HConnectionKey, ConnectionImplementation> oldHBaseInstances =
|
||||||
new HashMap<HConnectionKey, HConnectionImplementation>();
|
new HashMap<HConnectionKey, ConnectionImplementation>();
|
||||||
oldHBaseInstances.putAll(ConnectionManager.CONNECTION_INSTANCES);
|
oldHBaseInstances.putAll(ConnectionManager.CONNECTION_INSTANCES);
|
||||||
|
|
||||||
ConnectionManager.CONNECTION_INSTANCES.clear();
|
ConnectionManager.CONNECTION_INSTANCES.clear();
|
||||||
|
@ -609,8 +607,8 @@ public class TestHCM {
|
||||||
Put put = new Put(ROW);
|
Put put = new Put(ROW);
|
||||||
put.add(FAM_NAM, ROW, ROW);
|
put.add(FAM_NAM, ROW, ROW);
|
||||||
table.put(put);
|
table.put(put);
|
||||||
ConnectionManager.HConnectionImplementation conn =
|
ConnectionImplementation conn =
|
||||||
(ConnectionManager.HConnectionImplementation)table.getConnection();
|
(ConnectionImplementation)table.getConnection();
|
||||||
|
|
||||||
assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW));
|
assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW));
|
||||||
|
|
||||||
|
@ -810,8 +808,8 @@ public class TestHCM {
|
||||||
Put put = new Put(ROW);
|
Put put = new Put(ROW);
|
||||||
put.add(FAM_NAM, ROW, ROW);
|
put.add(FAM_NAM, ROW, ROW);
|
||||||
table.put(put);
|
table.put(put);
|
||||||
ConnectionManager.HConnectionImplementation conn =
|
ConnectionImplementation conn =
|
||||||
(ConnectionManager.HConnectionImplementation)table.getConnection();
|
(ConnectionImplementation)table.getConnection();
|
||||||
|
|
||||||
HRegionLocation location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation();
|
HRegionLocation location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation();
|
||||||
assertNotNull(location);
|
assertNotNull(location);
|
||||||
|
@ -940,7 +938,7 @@ public class TestHCM {
|
||||||
conn.close();
|
conn.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
private int setNumTries(HConnectionImplementation hci, int newVal) throws Exception {
|
private int setNumTries(ConnectionImplementation hci, int newVal) throws Exception {
|
||||||
Field numTries = hci.getClass().getDeclaredField("numTries");
|
Field numTries = hci.getClass().getDeclaredField("numTries");
|
||||||
numTries.setAccessible(true);
|
numTries.setAccessible(true);
|
||||||
Field modifiersField = Field.class.getDeclaredField("modifiers");
|
Field modifiersField = Field.class.getDeclaredField("modifiers");
|
||||||
|
@ -956,8 +954,8 @@ public class TestHCM {
|
||||||
public void testMulti() throws Exception {
|
public void testMulti() throws Exception {
|
||||||
HTable table = TEST_UTIL.createMultiRegionTable(TABLE_NAME3, FAM_NAM);
|
HTable table = TEST_UTIL.createMultiRegionTable(TABLE_NAME3, FAM_NAM);
|
||||||
try {
|
try {
|
||||||
ConnectionManager.HConnectionImplementation conn =
|
ConnectionImplementation conn =
|
||||||
( ConnectionManager.HConnectionImplementation)table.getConnection();
|
(ConnectionImplementation)table.getConnection();
|
||||||
|
|
||||||
// We're now going to move the region and check that it works for the client
|
// We're now going to move the region and check that it works for the client
|
||||||
// First a new put to add the location in the cache
|
// First a new put to add the location in the cache
|
||||||
|
|
|
@ -40,7 +40,6 @@ import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.TableNotFoundException;
|
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||||
import org.apache.hadoop.hbase.Waiter;
|
import org.apache.hadoop.hbase.Waiter;
|
||||||
import org.apache.hadoop.hbase.client.ConnectionManager.HConnectionImplementation;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
|
import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
@ -233,7 +232,7 @@ public class TestMetaWithReplicas {
|
||||||
}
|
}
|
||||||
Table htable = TEST_UTIL.createTable(TABLE, FAMILIES, TEST_UTIL.getConfiguration());
|
Table htable = TEST_UTIL.createTable(TABLE, FAMILIES, TEST_UTIL.getConfiguration());
|
||||||
byte[] row = "test".getBytes();
|
byte[] row = "test".getBytes();
|
||||||
HConnectionImplementation c = ((HConnectionImplementation)((HTable)htable).connection);
|
ConnectionImplementation c = ((ConnectionImplementation)((HTable)htable).connection);
|
||||||
// check that metalookup pool would get created
|
// check that metalookup pool would get created
|
||||||
c.relocateRegion(TABLE, row);
|
c.relocateRegion(TABLE, row);
|
||||||
ExecutorService ex = c.getCurrentMetaLookupPool();
|
ExecutorService ex = c.getCurrentMetaLookupPool();
|
||||||
|
|
|
@ -567,7 +567,7 @@ public class TestMultiParallel {
|
||||||
validateResult(result, QUALIFIER, Bytes.toBytes((numRequests / 2) + 1L));
|
validateResult(result, QUALIFIER, Bytes.toBytes((numRequests / 2) + 1L));
|
||||||
table.close();
|
table.close();
|
||||||
} finally {
|
} finally {
|
||||||
ConnectionManager.injectNonceGeneratorForTesting((ClusterConnection)connection, oldCnm);
|
ConnectionImplementation.injectNonceGeneratorForTesting((ClusterConnection) connection, oldCnm);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -63,4 +63,4 @@ log4j.logger.org.apache.hadoop.hbase=DEBUG
|
||||||
log4j.org.apache.hadoop.metrics2.impl.MetricsSystemImpl=ERROR
|
log4j.org.apache.hadoop.metrics2.impl.MetricsSystemImpl=ERROR
|
||||||
log4j.org.apache.hadoop.metrics2.util.MBeans=ERROR
|
log4j.org.apache.hadoop.metrics2.util.MBeans=ERROR
|
||||||
# Enable this to get detailed connection error/retry logging.
|
# Enable this to get detailed connection error/retry logging.
|
||||||
# log4j.logger.org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation=TRACE
|
# log4j.logger.org.apache.hadoop.hbase.client.ConnectionImplementation=TRACE
|
||||||
|
|
Loading…
Reference in New Issue