diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 97af2e8b5a7..4a3ed9048d3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -81,7 +81,7 @@ public class ReplicationSourceManager { // The path to the latest log we saw, for new coming sources private Path latestPath; // List of all the other region servers in this cluster - private final List otherRegionServers; + private final List otherRegionServers = new ArrayList(); // Path to the hlogs directories private final Path logDir; // Path to the hlog archive @@ -122,12 +122,9 @@ public class ReplicationSourceManager { this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 2000); this.zkHelper.registerRegionServerListener( new OtherRegionServerWatcher(this.zkHelper.getZookeeperWatcher())); - List otherRSs = - this.zkHelper.getRegisteredRegionServers(); this.zkHelper.registerRegionServerListener( new PeersWatcher(this.zkHelper.getZookeeperWatcher())); this.zkHelper.listPeersIdsAndWatch(); - this.otherRegionServers = otherRSs == null ? new ArrayList() : otherRSs; // It's preferable to failover 1 RS at a time, but with good zk servers // more could be processed at the same time. int nbWorkers = conf.getInt("replication.executor.workers", 1); @@ -182,6 +179,7 @@ public class ReplicationSourceManager { return; } synchronized (otherRegionServers) { + refreshOtherRegionServersList(); LOG.info("Current list of replicators: " + currentReplicators + " other RSs: " + otherRegionServers); } @@ -398,6 +396,27 @@ public class ReplicationSourceManager { this.zkHelper.deleteSource(id, true); } + /** + * Reads the list of region servers from ZK and atomically clears our + * local view of it and replaces it with the updated list. + * + * @return true if the local list of the other region servers was updated + * with the ZK data (even if it was empty), + * false if the data was missing in ZK + */ + private boolean refreshOtherRegionServersList() { + List newRsList = zkHelper.getRegisteredRegionServers(); + if (newRsList == null) { + return false; + } else { + synchronized (otherRegionServers) { + otherRegionServers.clear(); + otherRegionServers.addAll(newRsList); + } + } + return true; + } + /** * Watcher used to be notified of the other region server's death * in the local cluster. It initiates the process to transfer the queues @@ -417,7 +436,7 @@ public class ReplicationSourceManager { * @param path full path of the new node */ public void nodeCreated(String path) { - refreshRegionServersList(path); + refreshListIfRightPath(path); } /** @@ -428,7 +447,7 @@ public class ReplicationSourceManager { if (stopper.isStopped()) { return; } - boolean cont = refreshRegionServersList(path); + boolean cont = refreshListIfRightPath(path); if (!cont) { return; } @@ -444,23 +463,14 @@ public class ReplicationSourceManager { if (stopper.isStopped()) { return; } - refreshRegionServersList(path); + refreshListIfRightPath(path); } - private boolean refreshRegionServersList(String path) { + private boolean refreshListIfRightPath(String path) { if (!path.startsWith(zkHelper.getZookeeperWatcher().rsZNode)) { return false; } - List newRsList = (zkHelper.getRegisteredRegionServers()); - if (newRsList == null) { - return false; - } else { - synchronized (otherRegionServers) { - otherRegionServers.clear(); - otherRegionServers.addAll(newRsList); - } - } - return true; + return refreshOtherRegionServersList(); } }