diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java index 2152c2085a4..44c9edc2bf1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java @@ -631,45 +631,10 @@ class ConnectionManager { */ HConnectionImplementation(Configuration conf, boolean managed, ExecutorService pool, User user) throws IOException { - this(conf); + this.conf = conf; this.user = user; this.batchPool = pool; this.managed = managed; - this.registry = setupRegistry(); - retrieveClusterId(); - - this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId); - this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); - - // Do we publish the status? - boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED, - HConstants.STATUS_PUBLISHED_DEFAULT); - Class listenerClass = - conf.getClass(ClusterStatusListener.STATUS_LISTENER_CLASS, - ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS, - ClusterStatusListener.Listener.class); - if (shouldListen) { - if (listenerClass == null) { - LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " + - ClusterStatusListener.STATUS_LISTENER_CLASS + " is not set - not listening status"); - } else { - clusterStatusListener = new ClusterStatusListener( - new ClusterStatusListener.DeadServerHandler() { - @Override - public void newDead(ServerName sn) { - clearCaches(sn); - rpcClient.cancelConnections(sn); - } - }, conf, listenerClass); - } - } - } - - /** - * For tests. - */ - protected HConnectionImplementation(Configuration conf) { - this.conf = conf; this.tableConfig = new TableConfiguration(conf); this.closed = false; this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, @@ -690,11 +655,49 @@ class ConnectionManager { } else { this.nonceGenerator = new NoNonceGenerator(); } - stats = ServerStatisticTracker.create(conf); - this.asyncProcess = createAsyncProcess(this.conf); + + this.stats = ServerStatisticTracker.create(conf); this.interceptor = (new RetryingCallerInterceptorFactory(conf)).build(); + this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats); this.backoffPolicy = ClientBackoffPolicyFactory.create(conf); + this.asyncProcess = createAsyncProcess(this.conf); + + boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED, + HConstants.STATUS_PUBLISHED_DEFAULT); + Class listenerClass = + conf.getClass(ClusterStatusListener.STATUS_LISTENER_CLASS, + ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS, + ClusterStatusListener.Listener.class); + + try { + this.registry = setupRegistry(); + retrieveClusterId(); + + this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId); + + // Do we publish the status? + if (shouldListen) { + if (listenerClass == null) { + LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " + + ClusterStatusListener.STATUS_LISTENER_CLASS + " is not set - not listening status"); + } else { + clusterStatusListener = new ClusterStatusListener( + new ClusterStatusListener.DeadServerHandler() { + @Override + public void newDead(ServerName sn) { + clearCaches(sn); + rpcClient.cancelConnections(sn); + } + }, conf, listenerClass); + } + } + } catch (Throwable e) { + // avoid leaks: registry, rpcClient, ... + LOG.debug("connection construction failed", e); + close(); + throw e; + } } @Override @@ -2262,9 +2265,7 @@ class ConnectionManager { // For tests to override. protected AsyncProcess createAsyncProcess(Configuration conf) { // No default pool available. - return new AsyncProcess(this, conf, this.batchPool, - RpcRetryingCallerFactory.instantiate(conf, this.getStatisticsTracker()), false, - RpcControllerFactory.instantiate(conf)); + return new AsyncProcess(this, conf, batchPool, rpcCallerFactory, false, rpcControllerFactory); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegistryFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegistryFactory.java index dc2cb7c72f0..d7aa73900dc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegistryFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegistryFactory.java @@ -26,13 +26,15 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; */ @InterfaceAudience.Private class RegistryFactory { + static final String REGISTRY_IMPL_CONF_KEY = "hbase.client.registry.impl"; + /** * @return The cluster registry implementation to use. * @throws IOException */ static Registry getRegistry(final Connection connection) throws IOException { - String registryClass = connection.getConfiguration().get("hbase.client.registry.impl", + String registryClass = connection.getConfiguration().get(REGISTRY_IMPL_CONF_KEY, ZooKeeperRegistry.class.getName()); Registry registry = null; try { diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index e2e435d46a9..5c217402265 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -341,11 +341,40 @@ public class TestAsyncProcess { * Returns our async process. */ static class MyConnectionImpl extends ConnectionManager.HConnectionImplementation { + public static class TestRegistry implements Registry { + @Override + public void init(Connection connection) {} + + @Override + public RegionLocations getMetaRegionLocation() throws IOException { + return null; + } + + @Override + public String getClusterId() { + return "testClusterId"; + } + + @Override + public boolean isTableOnlineState(TableName tableName, boolean enabled) throws IOException { + return false; + } + + @Override + public int getCurrentNrHRS() throws IOException { + return 1; + } + } + final AtomicInteger nbThreads = new AtomicInteger(0); + protected MyConnectionImpl(Configuration conf) throws IOException { + super(setupConf(conf), false); + } - protected MyConnectionImpl(Configuration conf) { - super(conf); + private static Configuration setupConf(Configuration conf) { + conf.setClass(RegistryFactory.REGISTRY_IMPL_CONF_KEY, TestRegistry.class, Registry.class); + return conf; } @Override @@ -362,7 +391,7 @@ public class TestAsyncProcess { List hrl; final boolean usedRegions[]; - protected MyConnectionImpl2(List hrl) { + protected MyConnectionImpl2(List hrl) throws IOException { super(conf); this.hrl = hrl; this.usedRegions = new boolean[hrl.size()]; @@ -381,7 +410,6 @@ public class TestAsyncProcess { } return null; } - } @Rule