HBASE-14485 ConnectionImplementation leaks on construction failure

This commit is contained in:
Matteo Bertozzi 2015-10-01 12:56:37 -07:00
parent 4da3c935d4
commit 2e8575bb0f
3 changed files with 73 additions and 48 deletions

View File

@ -182,44 +182,9 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
*/ */
ConnectionImplementation(Configuration conf, ConnectionImplementation(Configuration conf,
ExecutorService pool, User user) throws IOException { ExecutorService pool, User user) throws IOException {
this(conf); this.conf = conf;
this.user = user; this.user = user;
this.batchPool = pool; this.batchPool = pool;
this.registry = setupRegistry();
retrieveClusterId();
this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId);
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
// Do we publish the status?
boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED,
HConstants.STATUS_PUBLISHED_DEFAULT);
Class<? extends ClusterStatusListener.Listener> listenerClass =
conf.getClass(ClusterStatusListener.STATUS_LISTENER_CLASS,
ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS,
ClusterStatusListener.Listener.class);
if (shouldListen) {
if (listenerClass == null) {
LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " +
ClusterStatusListener.STATUS_LISTENER_CLASS + " is not set - not listening status");
} else {
clusterStatusListener = new ClusterStatusListener(
new ClusterStatusListener.DeadServerHandler() {
@Override
public void newDead(ServerName sn) {
clearCaches(sn);
rpcClient.cancelConnections(sn);
}
}, conf, listenerClass);
}
}
}
/**
* For tests.
*/
protected ConnectionImplementation(Configuration conf) {
this.conf = conf;
this.tableConfig = new TableConfiguration(conf); this.tableConfig = new TableConfiguration(conf);
this.closed = false; this.closed = false;
this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
@ -239,11 +204,49 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
} else { } else {
nonceGenerator = new NoNonceGenerator(); nonceGenerator = new NoNonceGenerator();
} }
stats = ServerStatisticTracker.create(conf);
this.asyncProcess = createAsyncProcess(this.conf); this.stats = ServerStatisticTracker.create(conf);
this.interceptor = (new RetryingCallerInterceptorFactory(conf)).build(); this.interceptor = (new RetryingCallerInterceptorFactory(conf)).build();
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats); this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats);
this.backoffPolicy = ClientBackoffPolicyFactory.create(conf); this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
this.asyncProcess = createAsyncProcess(this.conf);
boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED,
HConstants.STATUS_PUBLISHED_DEFAULT);
Class<? extends ClusterStatusListener.Listener> listenerClass =
conf.getClass(ClusterStatusListener.STATUS_LISTENER_CLASS,
ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS,
ClusterStatusListener.Listener.class);
try {
this.registry = setupRegistry();
retrieveClusterId();
this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId);
// Do we publish the status?
if (shouldListen) {
if (listenerClass == null) {
LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " +
ClusterStatusListener.STATUS_LISTENER_CLASS + " is not set - not listening status");
} else {
clusterStatusListener = new ClusterStatusListener(
new ClusterStatusListener.DeadServerHandler() {
@Override
public void newDead(ServerName sn) {
clearCaches(sn);
rpcClient.cancelConnections(sn);
}
}, conf, listenerClass);
}
}
} catch (Throwable e) {
// avoid leaks: registry, rpcClient, ...
LOG.debug("connection construction failed", e);
close();
throw e;
}
} }
/** /**
@ -478,7 +481,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
protected String clusterId = null; protected String clusterId = null;
void retrieveClusterId() { protected void retrieveClusterId() {
if (clusterId != null) return; if (clusterId != null) return;
this.clusterId = this.registry.getClusterId(); this.clusterId = this.registry.getClusterId();
if (clusterId == null) { if (clusterId == null) {
@ -1979,9 +1982,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
// For tests to override. // For tests to override.
protected AsyncProcess createAsyncProcess(Configuration conf) { protected AsyncProcess createAsyncProcess(Configuration conf) {
// No default pool available. // No default pool available.
return new AsyncProcess(this, conf, this.batchPool, return new AsyncProcess(this, conf, batchPool, rpcCallerFactory, false, rpcControllerFactory);
RpcRetryingCallerFactory.instantiate(conf, this.getStatisticsTracker()), false,
RpcControllerFactory.instantiate(conf));
} }
@Override @Override

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
final class RegistryFactory { final class RegistryFactory {
static final String REGISTRY_IMPL_CONF_KEY = "hbase.client.registry.impl";
private RegistryFactory() {} private RegistryFactory() {}
@ -35,7 +36,7 @@ final class RegistryFactory {
*/ */
static Registry getRegistry(final Connection connection) static Registry getRegistry(final Connection connection)
throws IOException { throws IOException {
String registryClass = connection.getConfiguration().get("hbase.client.registry.impl", String registryClass = connection.getConfiguration().get(REGISTRY_IMPL_CONF_KEY,
ZooKeeperRegistry.class.getName()); ZooKeeperRegistry.class.getName());
Registry registry = null; Registry registry = null;
try { try {

View File

@ -342,11 +342,35 @@ public class TestAsyncProcess {
* Returns our async process. * Returns our async process.
*/ */
static class MyConnectionImpl extends ConnectionImplementation { static class MyConnectionImpl extends ConnectionImplementation {
public static class TestRegistry implements Registry {
@Override
public void init(Connection connection) {}
@Override
public RegionLocations getMetaRegionLocation() throws IOException {
return null;
}
@Override
public String getClusterId() {
return "testClusterId";
}
@Override
public int getCurrentNrHRS() throws IOException {
return 1;
}
}
final AtomicInteger nbThreads = new AtomicInteger(0); final AtomicInteger nbThreads = new AtomicInteger(0);
protected MyConnectionImpl(Configuration conf) throws IOException {
super(setupConf(conf), null, null);
}
protected MyConnectionImpl(Configuration conf) { private static Configuration setupConf(Configuration conf) {
super(conf); conf.setClass(RegistryFactory.REGISTRY_IMPL_CONF_KEY, TestRegistry.class, Registry.class);
return conf;
} }
@Override @Override
@ -363,7 +387,7 @@ public class TestAsyncProcess {
List<HRegionLocation> hrl; List<HRegionLocation> hrl;
final boolean usedRegions[]; final boolean usedRegions[];
protected MyConnectionImpl2(List<HRegionLocation> hrl) { protected MyConnectionImpl2(List<HRegionLocation> hrl) throws IOException {
super(conf); super(conf);
this.hrl = hrl; this.hrl = hrl;
this.usedRegions = new boolean[hrl.size()]; this.usedRegions = new boolean[hrl.size()];
@ -382,7 +406,6 @@ public class TestAsyncProcess {
} }
return null; return null;
} }
} }
@Rule @Rule