HBASE-1762 Remove concept of ZooKeeper from HConnection interface

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1480449 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2013-05-08 20:49:30 +00:00
parent 6ad28427b1
commit 4dc244ce5a
7 changed files with 67 additions and 63 deletions

View File

@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.MasterAdminService; import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.MasterAdminService;
import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.MasterMonitorService; import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.MasterMonitorService;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
/** /**
* A cluster connection. Knows how to find the master, locate regions out on the cluster, * A cluster connection. Knows how to find the master, locate regions out on the cluster,
@ -48,8 +47,7 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
* connections at a lower level. * connections at a lower level.
* *
* <p>HConnections are used by {@link HTable} mostly but also by * <p>HConnections are used by {@link HTable} mostly but also by
* {@link HBaseAdmin}, {@link CatalogTracker}, * {@link HBaseAdmin}, and {@link CatalogTracker}. HConnection instances can be shared. Sharing
* and {@link ZooKeeperWatcher}. HConnection instances can be shared. Sharing
* is usually what you want because rather than each HConnection instance * is usually what you want because rather than each HConnection instance
* having to do its own discovery of regions out on the cluster, instead, all * having to do its own discovery of regions out on the cluster, instead, all
* clients get to share the one cache of locations. {@link HConnectionManager} does the * clients get to share the one cache of locations. {@link HConnectionManager} does the
@ -66,17 +64,6 @@ public interface HConnection extends Abortable, Closeable {
*/ */
public Configuration getConfiguration(); public Configuration getConfiguration();
/**
* Retrieve ZooKeeperWatcher used by this connection.
* @return ZooKeeperWatcher handle being used by the connection.
* @throws IOException if a remote or network exception occurs
* @deprecated Removed because it was a mistake exposing zookeeper in this
* interface (ZooKeeper is an implementation detail).
* Deprecated in HBase 0.94
*/
@Deprecated
public ZooKeeperWatcher getZooKeeperWatcher() throws IOException;
/** @return - true if the master server is running */ /** @return - true if the master server is running */
public boolean isMasterRunning() public boolean isMasterRunning()
throws MasterNotRunningException, ZooKeeperConnectionException; throws MasterNotRunningException, ZooKeeperConnectionException;

View File

@ -1546,24 +1546,6 @@ public class HConnectionManager {
return serviceName + "@" + rsHostnamePort; return serviceName + "@" + rsHostnamePort;
} }
@Override
@Deprecated
public ZooKeeperWatcher getZooKeeperWatcher()
throws ZooKeeperConnectionException {
canCloseZKW = false;
try {
return getKeepAliveZooKeeperWatcher();
} catch (ZooKeeperConnectionException e){
throw e;
}catch (IOException e) {
// Encapsulate exception to keep interface
throw new ZooKeeperConnectionException(
"Can't create a zookeeper connection", e);
}
}
private ZooKeeperKeepAliveConnection keepAliveZookeeper; private ZooKeeperKeepAliveConnection keepAliveZookeeper;
private int keepAliveZookeeperUserCount; private int keepAliveZookeeperUserCount;
private boolean canCloseZKW = true; private boolean canCloseZKW = true;

View File

@ -45,8 +45,10 @@ class ZooKeeperKeepAliveConnection extends ZooKeeperWatcher{
@Override @Override
public void close() { public void close() {
if (this.abortable != null) {
((HConnectionManager.HConnectionImplementation)abortable).releaseZooKeeperWatcher(this); ((HConnectionManager.HConnectionImplementation)abortable).releaseZooKeeperWatcher(this);
} }
}
void internalClose(){ void internalClose(){
super.close(); super.close();

View File

@ -68,6 +68,8 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
// abortable in case of zk failure // abortable in case of zk failure
protected Abortable abortable; protected Abortable abortable;
// Used if abortable is null
private boolean aborted = false;
// listeners to be notified // listeners to be notified
private final List<ZooKeeperListener> listeners = private final List<ZooKeeperListener> listeners =
@ -128,10 +130,15 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
Abortable abortable) throws ZooKeeperConnectionException, IOException { Abortable abortable) throws ZooKeeperConnectionException, IOException {
this(conf, identifier, abortable, false); this(conf, identifier, abortable, false);
} }
/** /**
* Instantiate a ZooKeeper connection and watcher. * Instantiate a ZooKeeper connection and watcher.
* @param identifier string that is passed to RecoverableZookeeper to be used as * @param identifier string that is passed to RecoverableZookeeper to be used as
* identifier for this instance. Use null for default. * identifier for this instance. Use null for default.
* @param conf
* @param abortable Can be null if there is on error there is no host to abort: e.g. client
* context.
* @param canCreateBaseZNode
* @throws IOException * @throws IOException
* @throws ZooKeeperConnectionException * @throws ZooKeeperConnectionException
*/ */
@ -361,8 +368,9 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
"ZooKeeper, aborting"); "ZooKeeper, aborting");
// TODO: One thought is to add call to ZooKeeperListener so say, // TODO: One thought is to add call to ZooKeeperListener so say,
// ZooKeeperNodeTracker can zero out its data values. // ZooKeeperNodeTracker can zero out its data values.
if (this.abortable != null) this.abortable.abort(msg, if (this.abortable != null) {
new KeeperException.SessionExpiredException()); this.abortable.abort(msg, new KeeperException.SessionExpiredException());
}
break; break;
case ConnectedReadOnly: case ConnectedReadOnly:
@ -444,12 +452,13 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
@Override @Override
public void abort(String why, Throwable e) { public void abort(String why, Throwable e) {
this.abortable.abort(why, e); if (this.abortable != null) this.abortable.abort(why, e);
else this.aborted = true;
} }
@Override @Override
public boolean isAborted() { public boolean isAborted() {
return this.abortable.isAborted(); return this.abortable == null? this.aborted: this.abortable.isAborted();
} }
/** /**

View File

@ -175,20 +175,27 @@ public class Import {
Configuration conf = context.getConfiguration(); Configuration conf = context.getConfiguration();
cfRenameMap = createCfRenameMap(conf); cfRenameMap = createCfRenameMap(conf);
filter = instantiateFilter(conf); filter = instantiateFilter(conf);
// TODO: This is kind of ugly doing setup of ZKW just to read the clusterid.
ReplicationZookeeper zkHelper = null;
ZooKeeperWatcher zkw = null;
try { try {
HConnection connection = HConnectionManager.getConnection(conf); HConnection connection = HConnectionManager.getConnection(conf);
ZooKeeperWatcher zkw = connection.getZooKeeperWatcher(); zkw = new ZooKeeperWatcher(conf, context.getTaskAttemptID().toString(), null);
ReplicationZookeeper zkHelper = new ReplicationZookeeper(connection, conf, zkw); zkHelper = new ReplicationZookeeper(connection, conf, zkw);
clusterId = zkHelper.getUUIDForCluster(zkw); try {
this.clusterId = zkHelper.getUUIDForCluster(zkw);
} finally {
if (zkHelper != null) zkHelper.close();
}
} catch (ZooKeeperConnectionException e) { } catch (ZooKeeperConnectionException e) {
LOG.error("Problem connecting to ZooKeper during task setup", e); LOG.error("Problem connecting to ZooKeper during task setup", e);
} catch (KeeperException e) { } catch (KeeperException e) {
LOG.error("Problem reading ZooKeeper data during task setup", e); LOG.error("Problem reading ZooKeeper data during task setup", e);
} catch (IOException e) { } catch (IOException e) {
LOG.error("Problem setting up task", e); LOG.error("Problem setting up task", e);
} finally {
if (zkw != null) zkw.close();
} }
} }
} }

View File

@ -32,7 +32,8 @@ module Hbase
@admin = org.apache.hadoop.hbase.client.HBaseAdmin.new(configuration) @admin = org.apache.hadoop.hbase.client.HBaseAdmin.new(configuration)
connection = @admin.getConnection() connection = @admin.getConnection()
@conf = configuration @conf = configuration
@zk_wrapper = connection.getZooKeeperWatcher() @zk_wrapper = org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher.new(configuration,
"admin", nil)
zk = @zk_wrapper.getRecoverableZooKeeper().getZooKeeper() zk = @zk_wrapper.getRecoverableZooKeeper().getZooKeeper()
@zk_main = org.apache.zookeeper.ZooKeeperMain.new(zk) @zk_main = org.apache.zookeeper.ZooKeeperMain.new(zk)
@formatter = formatter @formatter = formatter

View File

@ -37,6 +37,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
@ -426,6 +427,23 @@ public class TestSplitTransactionOnCluster {
} }
} }
/**
* Noop Abortable implementation used below in tests.
*/
static class UselessTestAbortable implements Abortable {
boolean aborted = false;
@Override
public void abort(String why, Throwable e) {
LOG.warn("ABORTED (But nothing to abort): why=" + why, e);
aborted = true;
}
@Override
public boolean isAborted() {
return this.aborted;
}
}
/** /**
* Verifies HBASE-5806. When splitting is partially done and the master goes down * Verifies HBASE-5806. When splitting is partially done and the master goes down
* when the SPLIT node is in either SPLIT or SPLITTING state. * when the SPLIT node is in either SPLIT or SPLITTING state.
@ -453,6 +471,8 @@ public class TestSplitTransactionOnCluster {
this.admin.setBalancerRunning(false, true); this.admin.setBalancerRunning(false, true);
// Turn off the meta scanner so it don't remove parent on us. // Turn off the meta scanner so it don't remove parent on us.
cluster.getMaster().setCatalogJanitorEnabled(false); cluster.getMaster().setCatalogJanitorEnabled(false);
ZooKeeperWatcher zkw = new ZooKeeperWatcher(t.getConfiguration(),
"testMasterRestartWhenSplittingIsPartial", new UselessTestAbortable());
try { try {
// Add a bit of load up into the table so splittable. // Add a bit of load up into the table so splittable.
TESTING_UTIL.loadTable(t, HConstants.CATALOG_FAMILY); TESTING_UTIL.loadTable(t, HConstants.CATALOG_FAMILY);
@ -467,14 +487,11 @@ public class TestSplitTransactionOnCluster {
this.admin.split(hri.getRegionNameAsString()); this.admin.split(hri.getRegionNameAsString());
checkAndGetDaughters(tableName); checkAndGetDaughters(tableName);
// Assert the ephemeral node is up in zk. // Assert the ephemeral node is up in zk.
String path = ZKAssign.getNodeName(t.getConnection() String path = ZKAssign.getNodeName(zkw, hri.getEncodedName());
.getZooKeeperWatcher(), hri.getEncodedName()); Stat stats = zkw.getRecoverableZooKeeper().exists(path, false);
Stat stats = t.getConnection().getZooKeeperWatcher()
.getRecoverableZooKeeper().exists(path, false);
LOG.info("EPHEMERAL NODE BEFORE SERVER ABORT, path=" + path + ", stats=" LOG.info("EPHEMERAL NODE BEFORE SERVER ABORT, path=" + path + ", stats="
+ stats); + stats);
byte[] bytes = ZKAssign.getData(t.getConnection() byte[] bytes = ZKAssign.getData(zkw, hri.getEncodedName());
.getZooKeeperWatcher(), hri.getEncodedName());
RegionTransition rtd = RegionTransition.parseFrom(bytes); RegionTransition rtd = RegionTransition.parseFrom(bytes);
// State could be SPLIT or SPLITTING. // State could be SPLIT or SPLITTING.
assertTrue(rtd.getEventType().equals(EventType.RS_ZK_REGION_SPLIT) assertTrue(rtd.getEventType().equals(EventType.RS_ZK_REGION_SPLIT)
@ -498,6 +515,7 @@ public class TestSplitTransactionOnCluster {
admin.setBalancerRunning(true, false); admin.setBalancerRunning(true, false);
cluster.getMaster().setCatalogJanitorEnabled(true); cluster.getMaster().setCatalogJanitorEnabled(true);
t.close(); t.close();
zkw.close();
} }
} }
@ -526,6 +544,8 @@ public class TestSplitTransactionOnCluster {
this.admin.setBalancerRunning(false, true); this.admin.setBalancerRunning(false, true);
// Turn off the meta scanner so it don't remove parent on us. // Turn off the meta scanner so it don't remove parent on us.
cluster.getMaster().setCatalogJanitorEnabled(false); cluster.getMaster().setCatalogJanitorEnabled(false);
ZooKeeperWatcher zkw = new ZooKeeperWatcher(t.getConfiguration(),
"testMasterRestartAtRegionSplitPendingCatalogJanitor", new UselessTestAbortable());
try { try {
// Add a bit of load up into the table so splittable. // Add a bit of load up into the table so splittable.
TESTING_UTIL.loadTable(t, HConstants.CATALOG_FAMILY); TESTING_UTIL.loadTable(t, HConstants.CATALOG_FAMILY);
@ -536,22 +556,17 @@ public class TestSplitTransactionOnCluster {
this.admin.split(hri.getRegionNameAsString()); this.admin.split(hri.getRegionNameAsString());
checkAndGetDaughters(tableName); checkAndGetDaughters(tableName);
// Assert the ephemeral node is up in zk. // Assert the ephemeral node is up in zk.
String path = ZKAssign.getNodeName(t.getConnection() String path = ZKAssign.getNodeName(zkw, hri.getEncodedName());
.getZooKeeperWatcher(), hri.getEncodedName()); Stat stats = zkw.getRecoverableZooKeeper().exists(path, false);
Stat stats = t.getConnection().getZooKeeperWatcher()
.getRecoverableZooKeeper().exists(path, false);
LOG.info("EPHEMERAL NODE BEFORE SERVER ABORT, path=" + path + ", stats=" LOG.info("EPHEMERAL NODE BEFORE SERVER ABORT, path=" + path + ", stats="
+ stats); + stats);
String node = ZKAssign.getNodeName(t.getConnection() String node = ZKAssign.getNodeName(zkw, hri.getEncodedName());
.getZooKeeperWatcher(), hri.getEncodedName());
Stat stat = new Stat(); Stat stat = new Stat();
byte[] data = ZKUtil.getDataNoWatch(t.getConnection() byte[] data = ZKUtil.getDataNoWatch(zkw, node, stat);
.getZooKeeperWatcher(), node, stat);
// ZKUtil.create // ZKUtil.create
for (int i=0; data != null && i<60; i++) { for (int i=0; data != null && i<60; i++) {
Thread.sleep(1000); Thread.sleep(1000);
data = ZKUtil.getDataNoWatch(t.getConnection().getZooKeeperWatcher(), data = ZKUtil.getDataNoWatch(zkw, node, stat);
node, stat);
} }
assertNull("Waited too long for ZK node to be removed: "+node, data); assertNull("Waited too long for ZK node to be removed: "+node, data);
@ -571,6 +586,7 @@ public class TestSplitTransactionOnCluster {
this.admin.setBalancerRunning(true, false); this.admin.setBalancerRunning(true, false);
cluster.getMaster().setCatalogJanitorEnabled(true); cluster.getMaster().setCatalogJanitorEnabled(true);
t.close(); t.close();
zkw.close();
} }
} }