HBASE-6325 [replication] Race in ReplicationSourceManager.init can initiate a failover even if the node is alive
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1363573 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
309f846827
commit
ce00f26e71
|
@ -81,7 +81,7 @@ public class ReplicationSourceManager {
|
||||||
// The path to the latest log we saw, for new coming sources
|
// The path to the latest log we saw, for new coming sources
|
||||||
private Path latestPath;
|
private Path latestPath;
|
||||||
// List of all the other region servers in this cluster
|
// List of all the other region servers in this cluster
|
||||||
private final List<String> otherRegionServers;
|
private final List<String> otherRegionServers = new ArrayList<String>();
|
||||||
// Path to the hlogs directories
|
// Path to the hlogs directories
|
||||||
private final Path logDir;
|
private final Path logDir;
|
||||||
// Path to the hlog archive
|
// Path to the hlog archive
|
||||||
|
@ -122,12 +122,9 @@ public class ReplicationSourceManager {
|
||||||
this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 2000);
|
this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 2000);
|
||||||
this.zkHelper.registerRegionServerListener(
|
this.zkHelper.registerRegionServerListener(
|
||||||
new OtherRegionServerWatcher(this.zkHelper.getZookeeperWatcher()));
|
new OtherRegionServerWatcher(this.zkHelper.getZookeeperWatcher()));
|
||||||
List<String> otherRSs =
|
|
||||||
this.zkHelper.getRegisteredRegionServers();
|
|
||||||
this.zkHelper.registerRegionServerListener(
|
this.zkHelper.registerRegionServerListener(
|
||||||
new PeersWatcher(this.zkHelper.getZookeeperWatcher()));
|
new PeersWatcher(this.zkHelper.getZookeeperWatcher()));
|
||||||
this.zkHelper.listPeersIdsAndWatch();
|
this.zkHelper.listPeersIdsAndWatch();
|
||||||
this.otherRegionServers = otherRSs == null ? new ArrayList<String>() : otherRSs;
|
|
||||||
// It's preferable to failover 1 RS at a time, but with good zk servers
|
// It's preferable to failover 1 RS at a time, but with good zk servers
|
||||||
// more could be processed at the same time.
|
// more could be processed at the same time.
|
||||||
int nbWorkers = conf.getInt("replication.executor.workers", 1);
|
int nbWorkers = conf.getInt("replication.executor.workers", 1);
|
||||||
|
@ -182,6 +179,7 @@ public class ReplicationSourceManager {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
synchronized (otherRegionServers) {
|
synchronized (otherRegionServers) {
|
||||||
|
refreshOtherRegionServersList();
|
||||||
LOG.info("Current list of replicators: " + currentReplicators
|
LOG.info("Current list of replicators: " + currentReplicators
|
||||||
+ " other RSs: " + otherRegionServers);
|
+ " other RSs: " + otherRegionServers);
|
||||||
}
|
}
|
||||||
|
@ -398,6 +396,27 @@ public class ReplicationSourceManager {
|
||||||
this.zkHelper.deleteSource(id, true);
|
this.zkHelper.deleteSource(id, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reads the list of region servers from ZK and atomically clears our
|
||||||
|
* local view of it and replaces it with the updated list.
|
||||||
|
*
|
||||||
|
* @return true if the local list of the other region servers was updated
|
||||||
|
* with the ZK data (even if it was empty),
|
||||||
|
* false if the data was missing in ZK
|
||||||
|
*/
|
||||||
|
private boolean refreshOtherRegionServersList() {
|
||||||
|
List<String> newRsList = zkHelper.getRegisteredRegionServers();
|
||||||
|
if (newRsList == null) {
|
||||||
|
return false;
|
||||||
|
} else {
|
||||||
|
synchronized (otherRegionServers) {
|
||||||
|
otherRegionServers.clear();
|
||||||
|
otherRegionServers.addAll(newRsList);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Watcher used to be notified of the other region server's death
|
* Watcher used to be notified of the other region server's death
|
||||||
* in the local cluster. It initiates the process to transfer the queues
|
* in the local cluster. It initiates the process to transfer the queues
|
||||||
|
@ -417,7 +436,7 @@ public class ReplicationSourceManager {
|
||||||
* @param path full path of the new node
|
* @param path full path of the new node
|
||||||
*/
|
*/
|
||||||
public void nodeCreated(String path) {
|
public void nodeCreated(String path) {
|
||||||
refreshRegionServersList(path);
|
refreshListIfRightPath(path);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -428,7 +447,7 @@ public class ReplicationSourceManager {
|
||||||
if (stopper.isStopped()) {
|
if (stopper.isStopped()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
boolean cont = refreshRegionServersList(path);
|
boolean cont = refreshListIfRightPath(path);
|
||||||
if (!cont) {
|
if (!cont) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -444,23 +463,14 @@ public class ReplicationSourceManager {
|
||||||
if (stopper.isStopped()) {
|
if (stopper.isStopped()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
refreshRegionServersList(path);
|
refreshListIfRightPath(path);
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean refreshRegionServersList(String path) {
|
private boolean refreshListIfRightPath(String path) {
|
||||||
if (!path.startsWith(zkHelper.getZookeeperWatcher().rsZNode)) {
|
if (!path.startsWith(zkHelper.getZookeeperWatcher().rsZNode)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
List<String> newRsList = (zkHelper.getRegisteredRegionServers());
|
return refreshOtherRegionServersList();
|
||||||
if (newRsList == null) {
|
|
||||||
return false;
|
|
||||||
} else {
|
|
||||||
synchronized (otherRegionServers) {
|
|
||||||
otherRegionServers.clear();
|
|
||||||
otherRegionServers.addAll(newRsList);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue