HBASE-1232 zookeeper client wont reconnect if there is a problem
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@759821 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c150f02537
commit
e1888e57f5
|
@ -66,6 +66,8 @@ Release 0.20.0 - Unreleased
|
|||
HBASE-1293 hfile doesn't recycle decompressors (Ryan Rawson via Andrew
|
||||
Purtell)
|
||||
HBASE-1150 HMsg carries safemode flag; remove (Nitay Joffe via Stack)
|
||||
HBASE-1232 zookeeper client wont reconnect if there is a problem (Nitay
|
||||
Joffe via Andrew Purtell)
|
||||
|
||||
IMPROVEMENTS
|
||||
HBASE-1089 Add count of regions on filesystem to master UI; add percentage
|
||||
|
|
|
@ -29,12 +29,20 @@ import org.apache.hadoop.hbase.MasterNotRunningException;
|
|||
import org.apache.hadoop.hbase.io.BatchUpdate;
|
||||
import org.apache.hadoop.hbase.ipc.HMasterInterface;
|
||||
import org.apache.hadoop.hbase.ipc.HRegionInterface;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
|
||||
|
||||
/**
|
||||
* Cluster connection.
|
||||
* {@link HConnectionManager} manages instances of this class.
|
||||
*/
|
||||
public interface HConnection {
|
||||
/**
|
||||
* Retrieve ZooKeeperWrapper used by the connection.
|
||||
* @return ZooKeeperWrapper handle being used by the connection.
|
||||
* @throws IOException
|
||||
*/
|
||||
public ZooKeeperWrapper getZooKeeperWrapper() throws IOException;
|
||||
|
||||
/**
|
||||
* @return proxy connection to master server for this instance
|
||||
* @throws MasterNotRunningException
|
||||
|
|
|
@ -57,6 +57,9 @@ import org.apache.hadoop.hbase.util.SoftValueSortedMap;
|
|||
import org.apache.hadoop.hbase.util.Writables;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.zookeeper.WatchedEvent;
|
||||
import org.apache.zookeeper.Watcher;
|
||||
import org.apache.zookeeper.Watcher.Event.KeeperState;
|
||||
|
||||
/**
|
||||
* A non-instantiable class that manages connections to multiple tables in
|
||||
|
@ -114,7 +117,7 @@ public class HConnectionManager implements HConstants {
|
|||
}
|
||||
|
||||
/* Encapsulates finding the servers for an HBase instance */
|
||||
private static class TableServers implements ServerConnection, HConstants {
|
||||
private static class TableServers implements ServerConnection, HConstants, Watcher {
|
||||
private static final Log LOG = LogFactory.getLog(TableServers.class);
|
||||
private final Class<? extends HRegionInterface> serverInterfaceClass;
|
||||
private final long pause;
|
||||
|
@ -182,6 +185,29 @@ public class HConnectionManager implements HConstants {
|
|||
return this.pause * HConstants.RETRY_BACKOFF[ntries];
|
||||
}
|
||||
|
||||
/**
|
||||
* Called by ZooKeeper when an event occurs on our connection. We use this to
|
||||
* detect our session expiring. When our session expires, we have lost our
|
||||
* connection to ZooKeeper. Our handle is dead, and we need to recreate it.
|
||||
*
|
||||
* See http://hadoop.apache.org/zookeeper/docs/current/zookeeperProgrammers.html#ch_zkSessions
|
||||
* for more information.
|
||||
*
|
||||
* @param event WatchedEvent witnessed by ZooKeeper.
|
||||
*/
|
||||
public void process(WatchedEvent event) {
|
||||
KeeperState state = event.getState();
|
||||
LOG.debug("Got ZooKeeper event, state: " + state + ", type: " +
|
||||
event.getType() + ", path: " + event.getPath());
|
||||
if (state == KeeperState.Expired) {
|
||||
resetZooKeeper();
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void resetZooKeeper() {
|
||||
zooKeeperWrapper = null;
|
||||
}
|
||||
|
||||
// Used by master and region servers during safe mode only
|
||||
public void unsetRootRegionLocation() {
|
||||
this.rootRegionLocation = null;
|
||||
|
@ -197,8 +223,9 @@ public class HConnectionManager implements HConstants {
|
|||
}
|
||||
|
||||
public HMasterInterface getMaster() throws MasterNotRunningException {
|
||||
ZooKeeperWrapper zk = null;
|
||||
try {
|
||||
getZooKeeperWrapper();
|
||||
zk = getZooKeeperWrapper();
|
||||
} catch (IOException e) {
|
||||
throw new MasterNotRunningException(e);
|
||||
}
|
||||
|
@ -212,7 +239,7 @@ public class HConnectionManager implements HConstants {
|
|||
tries++) {
|
||||
|
||||
try {
|
||||
masterLocation = zooKeeperWrapper.readMasterAddressOrThrow();
|
||||
masterLocation = zk.readMasterAddressOrThrow();
|
||||
|
||||
HMasterInterface tryMaster = (HMasterInterface)HBaseRPC.getProxy(
|
||||
HMasterInterface.class, HBaseRPCProtocolVersion.versionID,
|
||||
|
@ -758,9 +785,9 @@ public class HConnectionManager implements HConstants {
|
|||
return server;
|
||||
}
|
||||
|
||||
private synchronized ZooKeeperWrapper getZooKeeperWrapper() throws IOException {
|
||||
public synchronized ZooKeeperWrapper getZooKeeperWrapper() throws IOException {
|
||||
if (zooKeeperWrapper == null) {
|
||||
zooKeeperWrapper = new ZooKeeperWrapper(conf);
|
||||
zooKeeperWrapper = new ZooKeeperWrapper(conf, this);
|
||||
}
|
||||
return zooKeeperWrapper;
|
||||
}
|
||||
|
@ -778,7 +805,7 @@ public class HConnectionManager implements HConstants {
|
|||
|
||||
// We lazily instantiate the ZooKeeper object because we don't want to
|
||||
// make the constructor have to throw IOException or handle it itself.
|
||||
ZooKeeperWrapper zooKeeperWrapper = getZooKeeperWrapper();
|
||||
ZooKeeperWrapper zk = getZooKeeperWrapper();
|
||||
|
||||
HServerAddress rootRegionAddress = null;
|
||||
for (int tries = 0; tries < numRetries; tries++) {
|
||||
|
@ -787,9 +814,9 @@ public class HConnectionManager implements HConstants {
|
|||
while (rootRegionAddress == null && localTimeouts < numRetries) {
|
||||
// Don't read root region until we're out of safe mode so we know
|
||||
// that the meta regions have been assigned.
|
||||
boolean outOfSafeMode = zooKeeperWrapper.checkOutOfSafeMode();
|
||||
boolean outOfSafeMode = zk.checkOutOfSafeMode();
|
||||
if (outOfSafeMode) {
|
||||
rootRegionAddress = zooKeeperWrapper.readRootRegionLocation();
|
||||
rootRegionAddress = zk.readRootRegionLocation();
|
||||
}
|
||||
if (rootRegionAddress == null) {
|
||||
try {
|
||||
|
|
|
@ -113,6 +113,24 @@ public class ZooKeeperWrapper implements HConstants {
|
|||
masterElectionZNode = getZNode(parentZNode, masterAddressZNodeName);
|
||||
}
|
||||
|
||||
/**
|
||||
* This is for testing KeeperException.SessionExpiredExcseption.
|
||||
* See HBASE-1232.
|
||||
* @return long session ID of this ZooKeeper session.
|
||||
*/
|
||||
public long getSessionID() {
|
||||
return zooKeeper.getSessionId();
|
||||
}
|
||||
|
||||
/**
|
||||
* This is for testing KeeperException.SessionExpiredExcseption.
|
||||
* See HBASE-1232.
|
||||
* @return byte[] password of this ZooKeeper session.
|
||||
*/
|
||||
public byte[] getSessionPassword() {
|
||||
return zooKeeper.getSessionPasswd();
|
||||
}
|
||||
|
||||
/**
|
||||
* This is for tests to directly set the ZooKeeper quorum servers.
|
||||
* @param servers comma separated host:port ZooKeeper quorum servers.
|
||||
|
|
|
@ -564,7 +564,7 @@ public abstract class HBaseTestCase extends TestCase {
|
|||
*
|
||||
* Sets the configuration parameter TEST_DIRECTORY_KEY if not already set.
|
||||
* Sets the boolean debugging if "DEBUGGING" is set in the environment.
|
||||
* If debugging is enabled, reconfigures loggin so that the root log level is
|
||||
* If debugging is enabled, reconfigures logging so that the root log level is
|
||||
* set to WARN and the logging level for the package is set to DEBUG.
|
||||
*/
|
||||
public static void initialize() {
|
||||
|
|
|
@ -58,16 +58,15 @@ public class MiniZooKeeperCluster {
|
|||
private boolean started;
|
||||
private int numPeers;
|
||||
private File baseDir;
|
||||
private String quorumServers;
|
||||
|
||||
// for distributed mode.
|
||||
private QuorumPeer[] quorumPeers;
|
||||
// for standalone mode.
|
||||
private NIOServerCnxn.Factory standaloneServerFactory;
|
||||
|
||||
/**
|
||||
* @throws IOException
|
||||
*/
|
||||
public MiniZooKeeperCluster() throws IOException {
|
||||
/** Create mini ZooKeeper cluster. */
|
||||
public MiniZooKeeperCluster() {
|
||||
this.started = false;
|
||||
}
|
||||
|
||||
|
@ -81,6 +80,13 @@ public class MiniZooKeeperCluster {
|
|||
FileTxnLog.setPreallocSize(100);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return String ZooKeeper quorum servers.
|
||||
*/
|
||||
public String getQuorumServers() {
|
||||
return quorumServers;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param numPeers
|
||||
* @param baseDir
|
||||
|
@ -116,7 +122,8 @@ public class MiniZooKeeperCluster {
|
|||
standaloneServerFactory = new NIOServerCnxn.Factory(CLIENT_PORT_START);
|
||||
standaloneServerFactory.startup(server);
|
||||
|
||||
ZooKeeperWrapper.setQuorumServers("localhost:" + CLIENT_PORT_START);
|
||||
quorumServers = "localhost:" + CLIENT_PORT_START;
|
||||
ZooKeeperWrapper.setQuorumServers(quorumServers);
|
||||
|
||||
if (!waitForServerUp(CLIENT_PORT_START, CONNECTION_TIMEOUT)) {
|
||||
throw new IOException("Waiting for startup of standalone server");
|
||||
|
@ -152,8 +159,8 @@ public class MiniZooKeeperCluster {
|
|||
serversBuffer.append("localhost:" + port);
|
||||
}
|
||||
|
||||
String servers = serversBuffer.toString();
|
||||
ZooKeeperWrapper.setQuorumServers(servers);
|
||||
quorumServers = serversBuffer.toString();
|
||||
ZooKeeperWrapper.setQuorumServers(quorumServers);
|
||||
|
||||
// Start quorum peer threads.
|
||||
for (QuorumPeer qp : quorumPeers) {
|
||||
|
|
|
@ -21,14 +21,24 @@ package org.apache.hadoop.hbase;
|
|||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hbase.client.HConnection;
|
||||
import org.apache.hadoop.hbase.client.HConnectionManager;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
|
||||
import org.apache.zookeeper.WatchedEvent;
|
||||
import org.apache.zookeeper.Watcher;
|
||||
import org.apache.zookeeper.ZooKeeper;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class TestZooKeeper extends HBaseClusterTestCase {
|
||||
private static class EmptyWatcher implements Watcher {
|
||||
public EmptyWatcher() {}
|
||||
public void process(WatchedEvent event) {}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setUp() throws Exception {
|
||||
setOpenMetaTable(false);
|
||||
|
@ -71,4 +81,29 @@ public class TestZooKeeper extends HBaseClusterTestCase {
|
|||
ZooKeeperWrapper zooKeeper = new ZooKeeperWrapper(conf);
|
||||
assertTrue(zooKeeper.writeOutOfSafeMode());
|
||||
}
|
||||
|
||||
/**
|
||||
* See HBASE-1232 and http://wiki.apache.org/hadoop/ZooKeeper/FAQ#4.
|
||||
* @throws IOException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
public void testClientSessionExpired() throws IOException, InterruptedException {
|
||||
new HTable(conf, HConstants.META_TABLE_NAME);
|
||||
|
||||
String quorumServers = zooKeeperCluster.getQuorumServers();
|
||||
int sessionTimeout = conf.getInt("zookeeper.session.timeout", 2 * 1000);
|
||||
Watcher watcher = new EmptyWatcher();
|
||||
HConnection connection = HConnectionManager.getConnection(conf);
|
||||
ZooKeeperWrapper connectionZK = connection.getZooKeeperWrapper();
|
||||
long sessionID = connectionZK.getSessionID();
|
||||
byte[] password = connectionZK.getSessionPassword();
|
||||
|
||||
ZooKeeper zk = new ZooKeeper(quorumServers, sessionTimeout, watcher, sessionID, password);
|
||||
zk.close();
|
||||
|
||||
Thread.sleep(sessionTimeout * 3);
|
||||
|
||||
System.err.println("ZooKeeper should have timed out");
|
||||
connection.relocateRegion(HConstants.ROOT_TABLE_NAME, HConstants.EMPTY_BYTE_ARRAY);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue