HBASE-1800 Too many ZK connections
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@809414 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ca71fd3eab
commit
61fa816157
|
@ -27,6 +27,7 @@ Release 0.21.0 - Unreleased
|
|||
HBASE-1776 Make rowcounter enum public
|
||||
HBASE-1276 [testing] Upgrade to JUnit 4.x and use @BeforeClass
|
||||
annotations to optimize tests
|
||||
HBASE-1800 Too many ZK connections
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
|
|
|
@ -79,6 +79,9 @@ public class HConnectionManager implements HConstants {
|
|||
final Map<HBaseConfiguration, TableServers> HBASE_INSTANCES =
|
||||
new WeakHashMap<HBaseConfiguration, TableServers>();
|
||||
|
||||
private static final Map<String, ClientZKWatcher> ZK_WRAPPERS =
|
||||
new HashMap<String, ClientZKWatcher>();
|
||||
|
||||
/**
|
||||
* Get the connection object for the instance specified by the configuration
|
||||
* If no current connection exists, create a new connection for that instance
|
||||
|
@ -124,10 +127,93 @@ public class HConnectionManager implements HConstants {
|
|||
}
|
||||
}
|
||||
}
|
||||
synchronized (ZK_WRAPPERS) {
|
||||
for (ClientZKWatcher watch : ZK_WRAPPERS.values()) {
|
||||
watch.resetZooKeeper();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a watcher of a zookeeper connection for a given quorum address.
|
||||
* If the connection isn't established, a new one is created.
|
||||
* This acts like a multiton.
|
||||
* @param conf
|
||||
* @return ZKW watcher
|
||||
* @throws IOException
|
||||
*/
|
||||
public static synchronized ClientZKWatcher getClientZooKeeperWatcher(
|
||||
HBaseConfiguration conf) throws IOException {
|
||||
if (!ZK_WRAPPERS.containsKey(conf.get(HConstants.ZOOKEEPER_QUORUM))) {
|
||||
ZK_WRAPPERS.put(conf.get(HConstants.ZOOKEEPER_QUORUM),
|
||||
new ClientZKWatcher(conf));
|
||||
}
|
||||
return ZK_WRAPPERS.get(conf.get(HConstants.ZOOKEEPER_QUORUM));
|
||||
}
|
||||
|
||||
/**
|
||||
* This class is responsible to handle connection and reconnection
|
||||
* to a zookeeper quorum.
|
||||
*
|
||||
*/
|
||||
public static class ClientZKWatcher implements Watcher {
|
||||
|
||||
static final Log LOG = LogFactory.getLog(ClientZKWatcher.class);
|
||||
private ZooKeeperWrapper zooKeeperWrapper;
|
||||
private HBaseConfiguration conf;
|
||||
|
||||
/**
|
||||
* Takes a configuration to pass it to ZKW but won't instanciate it
|
||||
* @param conf
|
||||
* @throws IOException
|
||||
*/
|
||||
public ClientZKWatcher(HBaseConfiguration conf) {
|
||||
this.conf = conf;
|
||||
}
|
||||
|
||||
/**
|
||||
* Called by ZooKeeper when an event occurs on our connection. We use this to
|
||||
* detect our session expiring. When our session expires, we have lost our
|
||||
* connection to ZooKeeper. Our handle is dead, and we need to recreate it.
|
||||
*
|
||||
* See http://hadoop.apache.org/zookeeper/docs/current/zookeeperProgrammers.html#ch_zkSessions
|
||||
* for more information.
|
||||
*
|
||||
* @param event WatchedEvent witnessed by ZooKeeper.
|
||||
*/
|
||||
public void process(WatchedEvent event) {
|
||||
KeeperState state = event.getState();
|
||||
LOG.debug("Got ZooKeeper event, state: " + state + ", type: "
|
||||
+ event.getType() + ", path: " + event.getPath());
|
||||
if (state == KeeperState.Expired) {
|
||||
resetZooKeeper();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get this watcher's ZKW, instanciate it if necessary.
|
||||
* @return ZKW
|
||||
*/
|
||||
public ZooKeeperWrapper getZooKeeperWrapper() throws IOException {
|
||||
if(zooKeeperWrapper == null) {
|
||||
zooKeeperWrapper = new ZooKeeperWrapper(conf, this);
|
||||
}
|
||||
return zooKeeperWrapper;
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear this connection to zookeeper.
|
||||
*/
|
||||
private synchronized void resetZooKeeper() {
|
||||
if (zooKeeperWrapper != null) {
|
||||
zooKeeperWrapper.close();
|
||||
zooKeeperWrapper = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* Encapsulates finding the servers for an HBase instance */
|
||||
private static class TableServers implements ServerConnection, HConstants, Watcher {
|
||||
private static class TableServers implements ServerConnection, HConstants {
|
||||
static final Log LOG = LogFactory.getLog(TableServers.class);
|
||||
private final Class<? extends HRegionInterface> serverInterfaceClass;
|
||||
private final long pause;
|
||||
|
@ -157,8 +243,6 @@ public class HConnectionManager implements HConstants {
|
|||
cachedRegionLocations =
|
||||
new HashMap<Integer, SoftValueSortedMap<byte [], HRegionLocation>>();
|
||||
|
||||
private ZooKeeperWrapper zooKeeperWrapper;
|
||||
|
||||
/**
|
||||
* constructor
|
||||
* @param conf Configuration object
|
||||
|
@ -197,32 +281,6 @@ public class HConnectionManager implements HConstants {
|
|||
return this.pause * HConstants.RETRY_BACKOFF[ntries];
|
||||
}
|
||||
|
||||
/**
|
||||
* Called by ZooKeeper when an event occurs on our connection. We use this to
|
||||
* detect our session expiring. When our session expires, we have lost our
|
||||
* connection to ZooKeeper. Our handle is dead, and we need to recreate it.
|
||||
*
|
||||
* See http://hadoop.apache.org/zookeeper/docs/current/zookeeperProgrammers.html#ch_zkSessions
|
||||
* for more information.
|
||||
*
|
||||
* @param event WatchedEvent witnessed by ZooKeeper.
|
||||
*/
|
||||
public void process(WatchedEvent event) {
|
||||
KeeperState state = event.getState();
|
||||
LOG.debug("Got ZooKeeper event, state: " + state + ", type: " +
|
||||
event.getType() + ", path: " + event.getPath());
|
||||
if (state == KeeperState.Expired) {
|
||||
resetZooKeeper();
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void resetZooKeeper() {
|
||||
if (zooKeeperWrapper != null) {
|
||||
zooKeeperWrapper.close();
|
||||
zooKeeperWrapper = null;
|
||||
}
|
||||
}
|
||||
|
||||
// Used by master and region servers during safe mode only
|
||||
public void unsetRootRegionLocation() {
|
||||
this.rootRegionLocation = null;
|
||||
|
@ -814,11 +872,10 @@ public class HConnectionManager implements HConstants {
|
|||
return getHRegionConnection(regionServer, false);
|
||||
}
|
||||
|
||||
public synchronized ZooKeeperWrapper getZooKeeperWrapper() throws IOException {
|
||||
if (zooKeeperWrapper == null) {
|
||||
zooKeeperWrapper = new ZooKeeperWrapper(conf, this);
|
||||
}
|
||||
return zooKeeperWrapper;
|
||||
public synchronized ZooKeeperWrapper getZooKeeperWrapper()
|
||||
throws IOException {
|
||||
return HConnectionManager.getClientZooKeeperWatcher(conf)
|
||||
.getZooKeeperWrapper();
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -1084,7 +1141,6 @@ public class HConnectionManager implements HConstants {
|
|||
master = null;
|
||||
masterChecked = false;
|
||||
}
|
||||
resetZooKeeper();
|
||||
if (stopProxy) {
|
||||
synchronized (servers) {
|
||||
for (HRegionInterface i: servers.values()) {
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.hbase;
|
|||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||
import org.apache.hadoop.hbase.client.HConnection;
|
||||
import org.apache.hadoop.hbase.client.HConnectionManager;
|
||||
|
@ -150,4 +151,28 @@ public class TestZooKeeper extends HBaseClusterTestCase {
|
|||
fail();
|
||||
}
|
||||
}
|
||||
|
||||
public void testMultipleZK() {
|
||||
try {
|
||||
HTable localMeta = new HTable(conf, HConstants.META_TABLE_NAME);
|
||||
HBaseConfiguration otherConf = new HBaseConfiguration(conf);
|
||||
otherConf.set(HConstants.ZOOKEEPER_QUORUM, "127.0.0.1");
|
||||
HTable ipMeta = new HTable(conf, HConstants.META_TABLE_NAME);
|
||||
|
||||
// dummy, just to open the connection
|
||||
localMeta.exists(new Get(HConstants.LAST_ROW));
|
||||
ipMeta.exists(new Get(HConstants.LAST_ROW));
|
||||
|
||||
// make sure they aren't the same
|
||||
assertFalse(HConnectionManager.getClientZooKeeperWatcher(conf)
|
||||
.getZooKeeperWrapper() == HConnectionManager.getClientZooKeeperWatcher(
|
||||
otherConf).getZooKeeperWrapper());
|
||||
assertFalse(HConnectionManager.getConnection(conf)
|
||||
.getZooKeeperWrapper().getQuorumServers().equals(HConnectionManager
|
||||
.getConnection(otherConf).getZooKeeperWrapper().getQuorumServers()));
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
fail();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue