HBASE-14485 ConnectionImplementation leaks on construction failure
This commit is contained in:
parent
3ef56c7b64
commit
afb3b19a15
@ -631,45 +631,10 @@ class ConnectionManager {
|
||||
*/
|
||||
HConnectionImplementation(Configuration conf, boolean managed,
|
||||
ExecutorService pool, User user) throws IOException {
|
||||
this(conf);
|
||||
this.conf = conf;
|
||||
this.user = user;
|
||||
this.batchPool = pool;
|
||||
this.managed = managed;
|
||||
this.registry = setupRegistry();
|
||||
retrieveClusterId();
|
||||
|
||||
this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId);
|
||||
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
|
||||
|
||||
// Do we publish the status?
|
||||
boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED,
|
||||
HConstants.STATUS_PUBLISHED_DEFAULT);
|
||||
Class<? extends ClusterStatusListener.Listener> listenerClass =
|
||||
conf.getClass(ClusterStatusListener.STATUS_LISTENER_CLASS,
|
||||
ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS,
|
||||
ClusterStatusListener.Listener.class);
|
||||
if (shouldListen) {
|
||||
if (listenerClass == null) {
|
||||
LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " +
|
||||
ClusterStatusListener.STATUS_LISTENER_CLASS + " is not set - not listening status");
|
||||
} else {
|
||||
clusterStatusListener = new ClusterStatusListener(
|
||||
new ClusterStatusListener.DeadServerHandler() {
|
||||
@Override
|
||||
public void newDead(ServerName sn) {
|
||||
clearCaches(sn);
|
||||
rpcClient.cancelConnections(sn);
|
||||
}
|
||||
}, conf, listenerClass);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* For tests.
|
||||
*/
|
||||
protected HConnectionImplementation(Configuration conf) {
|
||||
this.conf = conf;
|
||||
this.tableConfig = new TableConfiguration(conf);
|
||||
this.closed = false;
|
||||
this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
|
||||
@ -690,11 +655,49 @@ class ConnectionManager {
|
||||
} else {
|
||||
this.nonceGenerator = new NoNonceGenerator();
|
||||
}
|
||||
stats = ServerStatisticTracker.create(conf);
|
||||
this.asyncProcess = createAsyncProcess(this.conf);
|
||||
|
||||
this.stats = ServerStatisticTracker.create(conf);
|
||||
this.interceptor = (new RetryingCallerInterceptorFactory(conf)).build();
|
||||
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
|
||||
this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats);
|
||||
this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
|
||||
this.asyncProcess = createAsyncProcess(this.conf);
|
||||
|
||||
boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED,
|
||||
HConstants.STATUS_PUBLISHED_DEFAULT);
|
||||
Class<? extends ClusterStatusListener.Listener> listenerClass =
|
||||
conf.getClass(ClusterStatusListener.STATUS_LISTENER_CLASS,
|
||||
ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS,
|
||||
ClusterStatusListener.Listener.class);
|
||||
|
||||
try {
|
||||
this.registry = setupRegistry();
|
||||
retrieveClusterId();
|
||||
|
||||
this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId);
|
||||
|
||||
// Do we publish the status?
|
||||
if (shouldListen) {
|
||||
if (listenerClass == null) {
|
||||
LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " +
|
||||
ClusterStatusListener.STATUS_LISTENER_CLASS + " is not set - not listening status");
|
||||
} else {
|
||||
clusterStatusListener = new ClusterStatusListener(
|
||||
new ClusterStatusListener.DeadServerHandler() {
|
||||
@Override
|
||||
public void newDead(ServerName sn) {
|
||||
clearCaches(sn);
|
||||
rpcClient.cancelConnections(sn);
|
||||
}
|
||||
}, conf, listenerClass);
|
||||
}
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
// avoid leaks: registry, rpcClient, ...
|
||||
LOG.debug("connection construction failed", e);
|
||||
close();
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -2262,9 +2265,7 @@ class ConnectionManager {
|
||||
// For tests to override.
|
||||
protected AsyncProcess createAsyncProcess(Configuration conf) {
|
||||
// No default pool available.
|
||||
return new AsyncProcess(this, conf, this.batchPool,
|
||||
RpcRetryingCallerFactory.instantiate(conf, this.getStatisticsTracker()), false,
|
||||
RpcControllerFactory.instantiate(conf));
|
||||
return new AsyncProcess(this, conf, batchPool, rpcCallerFactory, false, rpcControllerFactory);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -26,13 +26,15 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class RegistryFactory {
|
||||
static final String REGISTRY_IMPL_CONF_KEY = "hbase.client.registry.impl";
|
||||
|
||||
/**
|
||||
* @return The cluster registry implementation to use.
|
||||
* @throws IOException
|
||||
*/
|
||||
static Registry getRegistry(final Connection connection)
|
||||
throws IOException {
|
||||
String registryClass = connection.getConfiguration().get("hbase.client.registry.impl",
|
||||
String registryClass = connection.getConfiguration().get(REGISTRY_IMPL_CONF_KEY,
|
||||
ZooKeeperRegistry.class.getName());
|
||||
Registry registry = null;
|
||||
try {
|
||||
|
@ -341,11 +341,40 @@ public class TestAsyncProcess {
|
||||
* Returns our async process.
|
||||
*/
|
||||
static class MyConnectionImpl extends ConnectionManager.HConnectionImplementation {
|
||||
public static class TestRegistry implements Registry {
|
||||
@Override
|
||||
public void init(Connection connection) {}
|
||||
|
||||
@Override
|
||||
public RegionLocations getMetaRegionLocation() throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getClusterId() {
|
||||
return "testClusterId";
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isTableOnlineState(TableName tableName, boolean enabled) throws IOException {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getCurrentNrHRS() throws IOException {
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
final AtomicInteger nbThreads = new AtomicInteger(0);
|
||||
|
||||
protected MyConnectionImpl(Configuration conf) throws IOException {
|
||||
super(setupConf(conf), false);
|
||||
}
|
||||
|
||||
protected MyConnectionImpl(Configuration conf) {
|
||||
super(conf);
|
||||
private static Configuration setupConf(Configuration conf) {
|
||||
conf.setClass(RegistryFactory.REGISTRY_IMPL_CONF_KEY, TestRegistry.class, Registry.class);
|
||||
return conf;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -362,7 +391,7 @@ public class TestAsyncProcess {
|
||||
List<HRegionLocation> hrl;
|
||||
final boolean usedRegions[];
|
||||
|
||||
protected MyConnectionImpl2(List<HRegionLocation> hrl) {
|
||||
protected MyConnectionImpl2(List<HRegionLocation> hrl) throws IOException {
|
||||
super(conf);
|
||||
this.hrl = hrl;
|
||||
this.usedRegions = new boolean[hrl.size()];
|
||||
@ -381,7 +410,6 @@ public class TestAsyncProcess {
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Rule
|
||||
|
Loading…
x
Reference in New Issue
Block a user