HBASE-20597 Use a lock to serialize access to a shared reference to ZooKeeperWatcher in HBaseReplicationEndpoint
This commit is contained in:
parent
498f3bf953
commit
1b70763b9e
@ -43,21 +43,22 @@ import org.apache.zookeeper.KeeperException.SessionExpiredException;
|
||||
* target cluster is an HBase cluster.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MT_CORRECTNESS",
|
||||
justification="Thinks zkw needs to be synchronized access but should be fine as is.")
|
||||
public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
|
||||
implements Abortable {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(HBaseReplicationEndpoint.class);
|
||||
|
||||
private ZooKeeperWatcher zkw = null; // FindBugs: MT_CORRECTNESS
|
||||
private Object zkwLock = new Object();
|
||||
private ZooKeeperWatcher zkw = null;
|
||||
|
||||
private List<ServerName> regionServers = new ArrayList<ServerName>(0);
|
||||
private long lastRegionServerUpdate;
|
||||
|
||||
protected void disconnect() {
|
||||
if (zkw != null) {
|
||||
zkw.close();
|
||||
synchronized (zkwLock) {
|
||||
if (zkw != null) {
|
||||
zkw.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -102,7 +103,9 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
|
||||
public synchronized UUID getPeerUUID() {
|
||||
UUID peerUUID = null;
|
||||
try {
|
||||
peerUUID = ZKClusterId.getUUIDForCluster(zkw);
|
||||
synchronized (zkwLock) {
|
||||
peerUUID = ZKClusterId.getUUIDForCluster(zkw);
|
||||
}
|
||||
} catch (KeeperException ke) {
|
||||
reconnect(ke);
|
||||
}
|
||||
@ -114,7 +117,9 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
|
||||
* @return zk connection
|
||||
*/
|
||||
protected ZooKeeperWatcher getZkw() {
|
||||
return zkw;
|
||||
synchronized (zkwLock) {
|
||||
return zkw;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -122,10 +127,14 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
|
||||
* @throws IOException If anything goes wrong connecting
|
||||
*/
|
||||
void reloadZkWatcher() throws IOException {
|
||||
if (zkw != null) zkw.close();
|
||||
zkw = new ZooKeeperWatcher(ctx.getConfiguration(),
|
||||
synchronized (zkwLock) {
|
||||
if (zkw != null) {
|
||||
zkw.close();
|
||||
}
|
||||
zkw = new ZooKeeperWatcher(ctx.getConfiguration(),
|
||||
"connection to cluster: " + ctx.getPeerId(), this);
|
||||
getZkw().registerListener(new PeerRegionServerListener(this));
|
||||
zkw.registerListener(new PeerRegionServerListener(this));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -163,13 +172,15 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
|
||||
* for this peer cluster
|
||||
* @return list of addresses
|
||||
*/
|
||||
// Synchronize peer cluster connection attempts to avoid races and rate
|
||||
// limit connections when multiple replication sources try to connect to
|
||||
// the peer cluster. If the peer cluster is down we can get out of control
|
||||
// over time.
|
||||
public synchronized List<ServerName> getRegionServers() {
|
||||
public List<ServerName> getRegionServers() {
|
||||
try {
|
||||
setRegionServers(fetchSlavesAddresses(this.getZkw()));
|
||||
// Synchronize peer cluster connection attempts to avoid races and rate
|
||||
// limit connections when multiple replication sources try to connect to
|
||||
// the peer cluster. If the peer cluster is down we can get out of control
|
||||
// over time.
|
||||
synchronized (zkwLock) {
|
||||
setRegionServers(fetchSlavesAddresses(zkw));
|
||||
}
|
||||
} catch (KeeperException ke) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Fetch slaves addresses failed", ke);
|
||||
|
Loading…
x
Reference in New Issue
Block a user