HBASE-5573 Replace client ZooKeeper watchers by simple ZooKeeper reads (N Keywal)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1307549 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2012-03-30 17:45:57 +00:00
parent 8ecfa1087c
commit 06e70aa877
7 changed files with 154 additions and 43 deletions

View File

@ -24,7 +24,10 @@ import java.io.IOException;
import java.util.Map; import java.util.Map;
import org.apache.commons.lang.NotImplementedException; import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.HConnectionManager;
@ -66,6 +69,7 @@ import org.apache.zookeeper.KeeperException;
* </p> * </p>
*/ */
public class ReplicationAdmin implements Closeable { public class ReplicationAdmin implements Closeable {
private static final Log LOG = LogFactory.getLog(ReplicationAdmin.class);
private final ReplicationZookeeper replicationZk; private final ReplicationZookeeper replicationZk;
private final HConnection connection; private final HConnection connection;
@ -82,7 +86,7 @@ public class ReplicationAdmin implements Closeable {
"enable it in order to use replication"); "enable it in order to use replication");
} }
this.connection = HConnectionManager.getConnection(conf); this.connection = HConnectionManager.getConnection(conf);
ZooKeeperWatcher zkw = this.connection.getZooKeeperWatcher(); ZooKeeperWatcher zkw = createZooKeeperWatcher();
try { try {
this.replicationZk = new ReplicationZookeeper(this.connection, conf, zkw); this.replicationZk = new ReplicationZookeeper(this.connection, conf, zkw);
} catch (KeeperException e) { } catch (KeeperException e) {
@ -90,6 +94,24 @@ public class ReplicationAdmin implements Closeable {
} }
} }
private ZooKeeperWatcher createZooKeeperWatcher() throws IOException {
return new ZooKeeperWatcher(connection.getConfiguration(),
"Replication Admin", new Abortable() {
@Override
public void abort(String why, Throwable e) {
LOG.error(why, e);
System.exit(1);
}
@Override
public boolean isAborted() {
return false;
}
});
}
/** /**
* Add a new peer cluster to replicate to. * Add a new peer cluster to replicate to.
* @param id a short that identifies the cluster * @param id a short that identifies the cluster

View File

@ -77,6 +77,7 @@ import org.apache.hadoop.hbase.util.hbck.TableIntegrityErrorHandler;
import org.apache.hadoop.hbase.util.hbck.TableIntegrityErrorHandlerImpl; import org.apache.hadoop.hbase.util.hbck.TableIntegrityErrorHandlerImpl;
import org.apache.hadoop.hbase.zookeeper.RootRegionTracker; import org.apache.hadoop.hbase.zookeeper.RootRegionTracker;
import org.apache.hadoop.hbase.zookeeper.ZKTable; import org.apache.hadoop.hbase.zookeeper.ZKTable;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
@ -954,13 +955,15 @@ public class HBaseFsck {
HConnectionManager.execute(new HConnectable<Void>(conf) { HConnectionManager.execute(new HConnectable<Void>(conf) {
@Override @Override
public Void connect(HConnection connection) throws IOException { public Void connect(HConnection connection) throws IOException {
ZooKeeperWatcher zkw = connection.getZooKeeperWatcher(); ZooKeeperWatcher zkw = createZooKeeperWatcher();
try { try {
for (String tableName : ZKTable.getDisabledOrDisablingTables(zkw)) { for (String tableName : ZKTable.getDisabledOrDisablingTables(zkw)) {
disabledTables.add(Bytes.toBytes(tableName)); disabledTables.add(Bytes.toBytes(tableName));
} }
} catch (KeeperException ke) { } catch (KeeperException ke) {
throw new IOException(ke); throw new IOException(ke);
} finally {
zkw.close();
} }
return null; return null;
} }
@ -1046,8 +1049,8 @@ public class HBaseFsck {
ServerName sn; ServerName sn;
try { try {
sn = getRootRegionServerName(); sn = getRootRegionServerName();
} catch (InterruptedException e) { } catch (KeeperException e) {
throw new IOException("Interrupted", e); throw new IOException(e);
} }
MetaEntry m = MetaEntry m =
new MetaEntry(rootLocation.getRegionInfo(), sn, System.currentTimeMillis()); new MetaEntry(rootLocation.getRegionInfo(), sn, System.currentTimeMillis());
@ -1056,29 +1059,35 @@ public class HBaseFsck {
return true; return true;
} }
private ZooKeeperWatcher createZooKeeperWatcher() throws IOException {
return new ZooKeeperWatcher(conf, "hbase Fsck", new Abortable() {
@Override
public void abort(String why, Throwable e) {
LOG.error(why, e);
System.exit(1);
}
@Override
public boolean isAborted() {
return false;
}
});
}
private ServerName getRootRegionServerName() private ServerName getRootRegionServerName()
throws IOException, InterruptedException { throws IOException, KeeperException {
RootRegionTracker rootRegionTracker =
new RootRegionTracker(this.connection.getZooKeeperWatcher(), new Abortable() { ZooKeeperWatcher zkw = createZooKeeperWatcher();
@Override
public void abort(String why, Throwable e) { byte[] data;
LOG.error(why, e);
System.exit(1);
}
@Override
public boolean isAborted(){
return false;
}
});
rootRegionTracker.start();
ServerName sn = null;
try { try {
sn = rootRegionTracker.getRootRegionLocation(); data = ZKUtil.getData(zkw, zkw.rootServerZNode);
} finally { } finally {
rootRegionTracker.stop(); zkw.close();
} }
return sn;
return RootRegionTracker.dataToServerName(data);
} }
/** /**

View File

@ -1179,12 +1179,24 @@ public class ZKUtil {
if (zkw == null) throw new IllegalArgumentException(); if (zkw == null) throw new IllegalArgumentException();
if (znode == null) throw new IllegalArgumentException(); if (znode == null) throw new IllegalArgumentException();
ZooKeeperNodeTracker znt = new ZooKeeperNodeTracker(zkw, znode, new Abortable() { byte[] data = null;
@Override public void abort(String why, Throwable e) {} boolean finished = false;
@Override public boolean isAborted() {return false;} final long endTime = System.currentTimeMillis() + timeout;
}) { while (!finished) {
}; try {
data = ZKUtil.getData(zkw, znode);
} catch(KeeperException e) {
LOG.warn("Unexpected exception handling blockUntilAvailable", e);
}
return znt.blockUntilAvailable(timeout, true); if (data == null && (System.currentTimeMillis() +
HConstants.SOCKET_RETRY_WAIT_MS < endTime)) {
Thread.sleep(HConstants.SOCKET_RETRY_WAIT_MS);
} else {
finished = true;
}
}
return data;
} }
} }

View File

@ -712,6 +712,12 @@ public class HBaseTestingUtility {
hbaseAdmin.close(); hbaseAdmin.close();
hbaseAdmin = null; hbaseAdmin = null;
} }
if (zooKeeperWatcher != null) {
zooKeeperWatcher.close();
zooKeeperWatcher = null;
}
if (this.hbaseCluster != null) { if (this.hbaseCluster != null) {
this.hbaseCluster.shutdown(); this.hbaseCluster.shutdown();
// Wait till hbase is down before going on to shutdown zk. // Wait till hbase is down before going on to shutdown zk.
@ -1417,6 +1423,32 @@ public class HBaseTestingUtility {
} }
private HBaseAdmin hbaseAdmin = null; private HBaseAdmin hbaseAdmin = null;
/**
* Returns a ZooKeeperWatcher instance.
* This instance is shared between HBaseTestingUtility instance users.
* Don't close it, it will be closed automatically when the
* cluster shutdowns
*
* @return The ZooKeeperWatcher instance.
* @throws IOException
*/
public synchronized ZooKeeperWatcher getZooKeeperWatcher()
throws IOException {
if (zooKeeperWatcher == null) {
zooKeeperWatcher = new ZooKeeperWatcher(conf, "testing utility",
new Abortable() {
@Override public void abort(String why, Throwable e) {
throw new RuntimeException("Unexpected abort in HBaseTestingUtility:"+why, e);
}
@Override public boolean isAborted() {return false;}
});
}
return zooKeeperWatcher;
}
private ZooKeeperWatcher zooKeeperWatcher;
/** /**
* Closes the named region. * Closes the named region.
* *

View File

@ -26,6 +26,8 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -87,6 +89,17 @@ public class TestZooKeeper {
TEST_UTIL.ensureSomeRegionServersAvailable(2); TEST_UTIL.ensureSomeRegionServersAvailable(2);
} }
private ZooKeeperWatcher getZooKeeperWatcher(HConnection c) throws
NoSuchMethodException, InvocationTargetException, IllegalAccessException {
Method getterZK = c.getClass().getMethod("getKeepAliveZooKeeperWatcher");
getterZK.setAccessible(true);
return (ZooKeeperWatcher) getterZK.invoke(c);
}
/** /**
* See HBASE-1232 and http://wiki.apache.org/hadoop/ZooKeeper/FAQ#4. * See HBASE-1232 and http://wiki.apache.org/hadoop/ZooKeeper/FAQ#4.
* @throws IOException * @throws IOException
@ -102,7 +115,7 @@ public class TestZooKeeper {
HConnection connection = HConnectionManager.getConnection(c); HConnection connection = HConnectionManager.getConnection(c);
ZooKeeperWatcher connectionZK = connection.getZooKeeperWatcher(); ZooKeeperWatcher connectionZK = getZooKeeperWatcher(connection);
LOG.info("ZooKeeperWatcher= 0x"+ Integer.toHexString( LOG.info("ZooKeeperWatcher= 0x"+ Integer.toHexString(
connectionZK.hashCode())); connectionZK.hashCode()));
LOG.info("getRecoverableZooKeeper= 0x"+ Integer.toHexString( LOG.info("getRecoverableZooKeeper= 0x"+ Integer.toHexString(
@ -143,7 +156,7 @@ public class TestZooKeeper {
Assert.assertTrue(state == States.CLOSED); Assert.assertTrue(state == States.CLOSED);
// Check that the client recovered // Check that the client recovered
ZooKeeperWatcher newConnectionZK = connection.getZooKeeperWatcher(); ZooKeeperWatcher newConnectionZK = getZooKeeperWatcher(connection);
States state2 = newConnectionZK.getRecoverableZooKeeper().getState(); States state2 = newConnectionZK.getRecoverableZooKeeper().getState();
LOG.info("After new get state=" +state2); LOG.info("After new get state=" +state2);
@ -154,7 +167,7 @@ public class TestZooKeeper {
while (System.currentTimeMillis() < limit2 && while (System.currentTimeMillis() < limit2 &&
state2 != States.CONNECTED && state2 != States.CONNECTING) { state2 != States.CONNECTED && state2 != States.CONNECTING) {
newConnectionZK = connection.getZooKeeperWatcher(); newConnectionZK = getZooKeeperWatcher(connection);
state2 = newConnectionZK.getRecoverableZooKeeper().getState(); state2 = newConnectionZK.getRecoverableZooKeeper().getState();
} }
LOG.info("After new get state loop=" + state2); LOG.info("After new get state loop=" + state2);
@ -233,11 +246,13 @@ public class TestZooKeeper {
ipMeta.exists(new Get(HConstants.LAST_ROW)); ipMeta.exists(new Get(HConstants.LAST_ROW));
// make sure they aren't the same // make sure they aren't the same
assertFalse(HConnectionManager.getConnection(localMeta.getConfiguration()).getZooKeeperWatcher() ZooKeeperWatcher z1 =
== HConnectionManager.getConnection(otherConf).getZooKeeperWatcher()); getZooKeeperWatcher(HConnectionManager.getConnection(localMeta.getConfiguration()));
assertFalse(HConnectionManager.getConnection(localMeta.getConfiguration()) ZooKeeperWatcher z2 =
.getZooKeeperWatcher().getQuorum().equals(HConnectionManager getZooKeeperWatcher(HConnectionManager.getConnection(otherConf));
.getConnection(otherConf).getZooKeeperWatcher().getQuorum())); assertFalse(z1 == z2);
assertFalse(z1.getQuorum().equals(z2.getQuorum()));
localMeta.close(); localMeta.close();
ipMeta.close(); ipMeta.close();
} catch (Exception e) { } catch (Exception e) {

View File

@ -274,6 +274,27 @@ public class TestHCM {
assertTrue(c2 != c3); assertTrue(c2 != c3);
} }
/**
* This test checks that one can connect to the cluster with only the
* ZooKeeper quorum set. Other stuff like master address will be read
* from ZK by the client.
*/
@Test(timeout = 10000)
public void testConnection() throws Exception{
// We create an empty config and add the ZK address.
Configuration c = new Configuration();
c.set(HConstants.ZOOKEEPER_QUORUM,
TEST_UTIL.getConfiguration().get(HConstants.ZOOKEEPER_QUORUM));
c.set(HConstants.ZOOKEEPER_CLIENT_PORT ,
TEST_UTIL.getConfiguration().get(HConstants.ZOOKEEPER_CLIENT_PORT));
// This should be enough to connect
HConnection conn = HConnectionManager.getConnection(c);
assertTrue( conn.isMasterRunning() );
conn.close();
}
@org.junit.Rule @org.junit.Rule
public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();

View File

@ -132,13 +132,13 @@ public class TestSplitTransactionOnCluster {
List<HRegion> daughters = cluster.getRegions(tableName); List<HRegion> daughters = cluster.getRegions(tableName);
assertTrue(daughters.size() >= 2); assertTrue(daughters.size() >= 2);
// Assert the ephemeral node is up in zk. // Assert the ephemeral node is up in zk.
String path = ZKAssign.getNodeName(t.getConnection().getZooKeeperWatcher(), String path = ZKAssign.getNodeName(TESTING_UTIL.getZooKeeperWatcher(),
hri.getEncodedName()); hri.getEncodedName());
Stat stats = Stat stats =
t.getConnection().getZooKeeperWatcher().getRecoverableZooKeeper().exists(path, false); TESTING_UTIL.getZooKeeperWatcher().getRecoverableZooKeeper().exists(path, false);
LOG.info("EPHEMERAL NODE BEFORE SERVER ABORT, path=" + path + ", stats=" + stats); LOG.info("EPHEMERAL NODE BEFORE SERVER ABORT, path=" + path + ", stats=" + stats);
RegionTransitionData rtd = RegionTransitionData rtd =
ZKAssign.getData(t.getConnection().getZooKeeperWatcher(), ZKAssign.getData(TESTING_UTIL.getZooKeeperWatcher(),
hri.getEncodedName()); hri.getEncodedName());
// State could be SPLIT or SPLITTING. // State could be SPLIT or SPLITTING.
assertTrue(rtd.getEventType().equals(EventType.RS_ZK_REGION_SPLIT) || assertTrue(rtd.getEventType().equals(EventType.RS_ZK_REGION_SPLIT) ||
@ -158,7 +158,7 @@ public class TestSplitTransactionOnCluster {
assertTrue(daughters.contains(r)); assertTrue(daughters.contains(r));
} }
// Finally assert that the ephemeral SPLIT znode was cleaned up. // Finally assert that the ephemeral SPLIT znode was cleaned up.
stats = t.getConnection().getZooKeeperWatcher().getRecoverableZooKeeper().exists(path, false); stats = TESTING_UTIL.getZooKeeperWatcher().getRecoverableZooKeeper().exists(path, false);
LOG.info("EPHEMERAL NODE AFTER SERVER ABORT, path=" + path + ", stats=" + stats); LOG.info("EPHEMERAL NODE AFTER SERVER ABORT, path=" + path + ", stats=" + stats);
assertTrue(stats == null); assertTrue(stats == null);
} finally { } finally {
@ -195,7 +195,7 @@ public class TestSplitTransactionOnCluster {
int regionCount = server.getOnlineRegions().size(); int regionCount = server.getOnlineRegions().size();
// Insert into zk a blocking znode, a znode of same name as region // Insert into zk a blocking znode, a znode of same name as region
// so it gets in way of our splitting. // so it gets in way of our splitting.
ZKAssign.createNodeClosing(t.getConnection().getZooKeeperWatcher(), ZKAssign.createNodeClosing(TESTING_UTIL.getZooKeeperWatcher(),
hri, new ServerName("any.old.server", 1234, -1)); hri, new ServerName("any.old.server", 1234, -1));
// Now try splitting.... should fail. And each should successfully // Now try splitting.... should fail. And each should successfully
// rollback. // rollback.
@ -208,7 +208,7 @@ public class TestSplitTransactionOnCluster {
assertEquals(regionCount, server.getOnlineRegions().size()); assertEquals(regionCount, server.getOnlineRegions().size());
} }
// Now clear the zknode // Now clear the zknode
ZKAssign.deleteClosingNode(t.getConnection().getZooKeeperWatcher(), hri); ZKAssign.deleteClosingNode(TESTING_UTIL.getZooKeeperWatcher(), hri);
// Now try splitting and it should work. // Now try splitting and it should work.
split(hri, server, regionCount); split(hri, server, regionCount);
// Get daughters // Get daughters