diff --git a/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java index 61374d94e40..cfe558f62d7 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java +++ b/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java @@ -24,7 +24,10 @@ import java.io.IOException; import java.util.Map; import org.apache.commons.lang.NotImplementedException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; @@ -66,6 +69,7 @@ import org.apache.zookeeper.KeeperException; *

*/ public class ReplicationAdmin implements Closeable { + private static final Log LOG = LogFactory.getLog(ReplicationAdmin.class); private final ReplicationZookeeper replicationZk; private final HConnection connection; @@ -82,7 +86,7 @@ public class ReplicationAdmin implements Closeable { "enable it in order to use replication"); } this.connection = HConnectionManager.getConnection(conf); - ZooKeeperWatcher zkw = this.connection.getZooKeeperWatcher(); + ZooKeeperWatcher zkw = createZooKeeperWatcher(); try { this.replicationZk = new ReplicationZookeeper(this.connection, conf, zkw); } catch (KeeperException e) { @@ -90,6 +94,24 @@ public class ReplicationAdmin implements Closeable { } } + private ZooKeeperWatcher createZooKeeperWatcher() throws IOException { + return new ZooKeeperWatcher(connection.getConfiguration(), + "Replication Admin", new Abortable() { + @Override + public void abort(String why, Throwable e) { + LOG.error(why, e); + System.exit(1); + } + + @Override + public boolean isAborted() { + return false; + } + + }); + } + + /** * Add a new peer cluster to replicate to. * @param id a short that identifies the cluster diff --git a/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java b/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java index 1574439c526..2f05005163a 100644 --- a/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java +++ b/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java @@ -77,6 +77,7 @@ import org.apache.hadoop.hbase.util.hbck.TableIntegrityErrorHandler; import org.apache.hadoop.hbase.util.hbck.TableIntegrityErrorHandlerImpl; import org.apache.hadoop.hbase.zookeeper.RootRegionTracker; import org.apache.hadoop.hbase.zookeeper.ZKTable; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; @@ -954,13 +955,15 @@ public class HBaseFsck { HConnectionManager.execute(new HConnectable(conf) { @Override public Void connect(HConnection connection) throws IOException { - ZooKeeperWatcher zkw = connection.getZooKeeperWatcher(); + ZooKeeperWatcher zkw = createZooKeeperWatcher(); try { for (String tableName : ZKTable.getDisabledOrDisablingTables(zkw)) { disabledTables.add(Bytes.toBytes(tableName)); } } catch (KeeperException ke) { throw new IOException(ke); + } finally { + zkw.close(); } return null; } @@ -1046,8 +1049,8 @@ public class HBaseFsck { ServerName sn; try { sn = getRootRegionServerName(); - } catch (InterruptedException e) { - throw new IOException("Interrupted", e); + } catch (KeeperException e) { + throw new IOException(e); } MetaEntry m = new MetaEntry(rootLocation.getRegionInfo(), sn, System.currentTimeMillis()); @@ -1056,29 +1059,35 @@ public class HBaseFsck { return true; } + private ZooKeeperWatcher createZooKeeperWatcher() throws IOException { + return new ZooKeeperWatcher(conf, "hbase Fsck", new Abortable() { + @Override + public void abort(String why, Throwable e) { + LOG.error(why, e); + System.exit(1); + } + + @Override + public boolean isAborted() { + return false; + } + + }); + } + private ServerName getRootRegionServerName() - throws IOException, InterruptedException { - RootRegionTracker rootRegionTracker = - new RootRegionTracker(this.connection.getZooKeeperWatcher(), new Abortable() { - @Override - public void abort(String why, Throwable e) { - LOG.error(why, e); - System.exit(1); - } - @Override - public boolean isAborted(){ - return false; - } - - }); - rootRegionTracker.start(); - ServerName sn = null; + throws IOException, KeeperException { + + ZooKeeperWatcher zkw = createZooKeeperWatcher(); + + byte[] data; try { - sn = rootRegionTracker.getRootRegionLocation(); + data = ZKUtil.getData(zkw, zkw.rootServerZNode); } finally { - rootRegionTracker.stop(); + zkw.close(); } - return sn; + + return RootRegionTracker.dataToServerName(data); } /** diff --git a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java index 0078ebccc01..037e354b7e2 100644 --- a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java +++ b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java @@ -1179,12 +1179,24 @@ public class ZKUtil { if (zkw == null) throw new IllegalArgumentException(); if (znode == null) throw new IllegalArgumentException(); - ZooKeeperNodeTracker znt = new ZooKeeperNodeTracker(zkw, znode, new Abortable() { - @Override public void abort(String why, Throwable e) {} - @Override public boolean isAborted() {return false;} - }) { - }; + byte[] data = null; + boolean finished = false; + final long endTime = System.currentTimeMillis() + timeout; + while (!finished) { + try { + data = ZKUtil.getData(zkw, znode); + } catch(KeeperException e) { + LOG.warn("Unexpected exception handling blockUntilAvailable", e); + } - return znt.blockUntilAvailable(timeout, true); + if (data == null && (System.currentTimeMillis() + + HConstants.SOCKET_RETRY_WAIT_MS < endTime)) { + Thread.sleep(HConstants.SOCKET_RETRY_WAIT_MS); + } else { + finished = true; + } + } + + return data; } } diff --git a/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 1c044e3156f..3f239e06814 100644 --- a/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -712,6 +712,12 @@ public class HBaseTestingUtility { hbaseAdmin.close(); hbaseAdmin = null; } + + if (zooKeeperWatcher != null) { + zooKeeperWatcher.close(); + zooKeeperWatcher = null; + } + if (this.hbaseCluster != null) { this.hbaseCluster.shutdown(); // Wait till hbase is down before going on to shutdown zk. @@ -1417,6 +1423,32 @@ public class HBaseTestingUtility { } private HBaseAdmin hbaseAdmin = null; + /** + * Returns a ZooKeeperWatcher instance. + * This instance is shared between HBaseTestingUtility instance users. + * Don't close it, it will be closed automatically when the + * cluster shutdowns + * + * @return The ZooKeeperWatcher instance. + * @throws IOException + */ + public synchronized ZooKeeperWatcher getZooKeeperWatcher() + throws IOException { + if (zooKeeperWatcher == null) { + zooKeeperWatcher = new ZooKeeperWatcher(conf, "testing utility", + new Abortable() { + @Override public void abort(String why, Throwable e) { + throw new RuntimeException("Unexpected abort in HBaseTestingUtility:"+why, e); + } + @Override public boolean isAborted() {return false;} + }); + } + return zooKeeperWatcher; + } + private ZooKeeperWatcher zooKeeperWatcher; + + + /** * Closes the named region. * diff --git a/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java b/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java index fc3c46ec973..27b858b735c 100644 --- a/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java +++ b/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java @@ -26,6 +26,8 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.fail; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -87,6 +89,17 @@ public class TestZooKeeper { TEST_UTIL.ensureSomeRegionServersAvailable(2); } + + private ZooKeeperWatcher getZooKeeperWatcher(HConnection c) throws + NoSuchMethodException, InvocationTargetException, IllegalAccessException { + + Method getterZK = c.getClass().getMethod("getKeepAliveZooKeeperWatcher"); + getterZK.setAccessible(true); + + return (ZooKeeperWatcher) getterZK.invoke(c); + } + + /** * See HBASE-1232 and http://wiki.apache.org/hadoop/ZooKeeper/FAQ#4. * @throws IOException @@ -102,7 +115,7 @@ public class TestZooKeeper { HConnection connection = HConnectionManager.getConnection(c); - ZooKeeperWatcher connectionZK = connection.getZooKeeperWatcher(); + ZooKeeperWatcher connectionZK = getZooKeeperWatcher(connection); LOG.info("ZooKeeperWatcher= 0x"+ Integer.toHexString( connectionZK.hashCode())); LOG.info("getRecoverableZooKeeper= 0x"+ Integer.toHexString( @@ -143,7 +156,7 @@ public class TestZooKeeper { Assert.assertTrue(state == States.CLOSED); // Check that the client recovered - ZooKeeperWatcher newConnectionZK = connection.getZooKeeperWatcher(); + ZooKeeperWatcher newConnectionZK = getZooKeeperWatcher(connection); States state2 = newConnectionZK.getRecoverableZooKeeper().getState(); LOG.info("After new get state=" +state2); @@ -154,7 +167,7 @@ public class TestZooKeeper { while (System.currentTimeMillis() < limit2 && state2 != States.CONNECTED && state2 != States.CONNECTING) { - newConnectionZK = connection.getZooKeeperWatcher(); + newConnectionZK = getZooKeeperWatcher(connection); state2 = newConnectionZK.getRecoverableZooKeeper().getState(); } LOG.info("After new get state loop=" + state2); @@ -233,11 +246,13 @@ public class TestZooKeeper { ipMeta.exists(new Get(HConstants.LAST_ROW)); // make sure they aren't the same - assertFalse(HConnectionManager.getConnection(localMeta.getConfiguration()).getZooKeeperWatcher() - == HConnectionManager.getConnection(otherConf).getZooKeeperWatcher()); - assertFalse(HConnectionManager.getConnection(localMeta.getConfiguration()) - .getZooKeeperWatcher().getQuorum().equals(HConnectionManager - .getConnection(otherConf).getZooKeeperWatcher().getQuorum())); + ZooKeeperWatcher z1 = + getZooKeeperWatcher(HConnectionManager.getConnection(localMeta.getConfiguration())); + ZooKeeperWatcher z2 = + getZooKeeperWatcher(HConnectionManager.getConnection(otherConf)); + assertFalse(z1 == z2); + assertFalse(z1.getQuorum().equals(z2.getQuorum())); + localMeta.close(); ipMeta.close(); } catch (Exception e) { diff --git a/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java index 9ac4751bd39..bf6b49a2ba2 100644 --- a/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java +++ b/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java @@ -274,6 +274,27 @@ public class TestHCM { assertTrue(c2 != c3); } + + /** + * This test checks that one can connect to the cluster with only the + * ZooKeeper quorum set. Other stuff like master address will be read + * from ZK by the client. + */ + @Test(timeout = 10000) + public void testConnection() throws Exception{ + // We create an empty config and add the ZK address. + Configuration c = new Configuration(); + c.set(HConstants.ZOOKEEPER_QUORUM, + TEST_UTIL.getConfiguration().get(HConstants.ZOOKEEPER_QUORUM)); + c.set(HConstants.ZOOKEEPER_CLIENT_PORT , + TEST_UTIL.getConfiguration().get(HConstants.ZOOKEEPER_CLIENT_PORT)); + + // This should be enough to connect + HConnection conn = HConnectionManager.getConnection(c); + assertTrue( conn.isMasterRunning() ); + conn.close(); + } + @org.junit.Rule public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java index 7485832ee48..75b5aeaf4d7 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java @@ -132,13 +132,13 @@ public class TestSplitTransactionOnCluster { List daughters = cluster.getRegions(tableName); assertTrue(daughters.size() >= 2); // Assert the ephemeral node is up in zk. - String path = ZKAssign.getNodeName(t.getConnection().getZooKeeperWatcher(), + String path = ZKAssign.getNodeName(TESTING_UTIL.getZooKeeperWatcher(), hri.getEncodedName()); Stat stats = - t.getConnection().getZooKeeperWatcher().getRecoverableZooKeeper().exists(path, false); + TESTING_UTIL.getZooKeeperWatcher().getRecoverableZooKeeper().exists(path, false); LOG.info("EPHEMERAL NODE BEFORE SERVER ABORT, path=" + path + ", stats=" + stats); RegionTransitionData rtd = - ZKAssign.getData(t.getConnection().getZooKeeperWatcher(), + ZKAssign.getData(TESTING_UTIL.getZooKeeperWatcher(), hri.getEncodedName()); // State could be SPLIT or SPLITTING. assertTrue(rtd.getEventType().equals(EventType.RS_ZK_REGION_SPLIT) || @@ -158,7 +158,7 @@ public class TestSplitTransactionOnCluster { assertTrue(daughters.contains(r)); } // Finally assert that the ephemeral SPLIT znode was cleaned up. - stats = t.getConnection().getZooKeeperWatcher().getRecoverableZooKeeper().exists(path, false); + stats = TESTING_UTIL.getZooKeeperWatcher().getRecoverableZooKeeper().exists(path, false); LOG.info("EPHEMERAL NODE AFTER SERVER ABORT, path=" + path + ", stats=" + stats); assertTrue(stats == null); } finally { @@ -195,7 +195,7 @@ public class TestSplitTransactionOnCluster { int regionCount = server.getOnlineRegions().size(); // Insert into zk a blocking znode, a znode of same name as region // so it gets in way of our splitting. - ZKAssign.createNodeClosing(t.getConnection().getZooKeeperWatcher(), + ZKAssign.createNodeClosing(TESTING_UTIL.getZooKeeperWatcher(), hri, new ServerName("any.old.server", 1234, -1)); // Now try splitting.... should fail. And each should successfully // rollback. @@ -208,7 +208,7 @@ public class TestSplitTransactionOnCluster { assertEquals(regionCount, server.getOnlineRegions().size()); } // Now clear the zknode - ZKAssign.deleteClosingNode(t.getConnection().getZooKeeperWatcher(), hri); + ZKAssign.deleteClosingNode(TESTING_UTIL.getZooKeeperWatcher(), hri); // Now try splitting and it should work. split(hri, server, regionCount); // Get daughters