Revert "HBASE-20597 Use a lock to serialize access to a shared reference to ZooKeeperWatcher in HBaseReplicationEndpoint"

This reverts commit 60dcef289b.
This commit is contained in:
Andrew Purtell 2018-05-29 11:24:30 -07:00
parent d36cce1574
commit 7f154dc20e
1 changed files with 16 additions and 27 deletions

View File

@ -43,22 +43,21 @@ import org.slf4j.LoggerFactory;
* target cluster is an HBase cluster.
*/
@InterfaceAudience.Private
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MT_CORRECTNESS",
justification="Thinks zkw needs to be synchronized access but should be fine as is.")
public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
implements Abortable {
private static final Logger LOG = LoggerFactory.getLogger(HBaseReplicationEndpoint.class);
private Object zkwLock = new Object();
private ZKWatcher zkw = null;
private ZKWatcher zkw = null; // FindBugs: MT_CORRECTNESS
private List<ServerName> regionServers = new ArrayList<>(0);
private long lastRegionServerUpdate;
protected void disconnect() {
synchronized (zkwLock) {
if (zkw != null) {
zkw.close();
}
if (zkw != null) {
zkw.close();
}
}
@ -113,9 +112,7 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
public synchronized UUID getPeerUUID() {
UUID peerUUID = null;
try {
synchronized (zkwLock) {
peerUUID = ZKClusterId.getUUIDForCluster(zkw);
}
peerUUID = ZKClusterId.getUUIDForCluster(zkw);
} catch (KeeperException ke) {
reconnect(ke);
}
@ -127,9 +124,7 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
* @return zk connection
*/
protected ZKWatcher getZkw() {
synchronized (zkwLock) {
return zkw;
}
return zkw;
}
/**
@ -137,14 +132,10 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
* @throws IOException If anything goes wrong connecting
*/
void reloadZkWatcher() throws IOException {
synchronized (zkwLock) {
if (zkw != null) {
zkw.close();
}
zkw = new ZKWatcher(ctx.getConfiguration(),
if (zkw != null) zkw.close();
zkw = new ZKWatcher(ctx.getConfiguration(),
"connection to cluster: " + ctx.getPeerId(), this);
zkw.registerListener(new PeerRegionServerListener(this));
}
getZkw().registerListener(new PeerRegionServerListener(this));
}
@Override
@ -182,15 +173,13 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
* for this peer cluster
* @return list of addresses
*/
public List<ServerName> getRegionServers() {
// Synchronize peer cluster connection attempts to avoid races and rate
// limit connections when multiple replication sources try to connect to
// the peer cluster. If the peer cluster is down we can get out of control
// over time.
public synchronized List<ServerName> getRegionServers() {
try {
// Synchronize peer cluster connection attempts to avoid races and rate
// limit connections when multiple replication sources try to connect to
// the peer cluster. If the peer cluster is down we can get out of control
// over time.
synchronized (zkwLock) {
setRegionServers(fetchSlavesAddresses(zkw));
}
setRegionServers(fetchSlavesAddresses(this.getZkw()));
} catch (KeeperException ke) {
if (LOG.isDebugEnabled()) {
LOG.debug("Fetch slaves addresses failed", ke);