HBASE-25553 It is better for ReplicationTracker.getListOfRegionServers to return ServerName instead of String (#2928)
Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org> Signed-off-by: Viraj Jasani <vjasani@apache.org>
This commit is contained in:
parent
829790e11a
commit
87e516da6c
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -37,13 +38,13 @@ public interface ReplicationTracker {
|
||||||
* Register a replication listener to receive replication events.
|
* Register a replication listener to receive replication events.
|
||||||
* @param listener
|
* @param listener
|
||||||
*/
|
*/
|
||||||
public void registerListener(ReplicationListener listener);
|
void registerListener(ReplicationListener listener);
|
||||||
|
|
||||||
public void removeListener(ReplicationListener listener);
|
void removeListener(ReplicationListener listener);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns a list of other live region servers in the cluster.
|
* Returns a list of other live region servers in the cluster.
|
||||||
* @return List of region servers.
|
* @return List of region servers.
|
||||||
*/
|
*/
|
||||||
public List<String> getListOfRegionServers();
|
List<ServerName> getListOfRegionServers();
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,7 +20,10 @@ package org.apache.hadoop.hbase.replication;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.CopyOnWriteArrayList;
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.Abortable;
|
import org.apache.hadoop.hbase.Abortable;
|
||||||
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.Stoppable;
|
import org.apache.hadoop.hbase.Stoppable;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKListener;
|
import org.apache.hadoop.hbase.zookeeper.ZKListener;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||||
|
@ -49,7 +52,7 @@ public class ReplicationTrackerZKImpl implements ReplicationTracker {
|
||||||
// listeners to be notified
|
// listeners to be notified
|
||||||
private final List<ReplicationListener> listeners = new CopyOnWriteArrayList<>();
|
private final List<ReplicationListener> listeners = new CopyOnWriteArrayList<>();
|
||||||
// List of all the other region servers in this cluster
|
// List of all the other region servers in this cluster
|
||||||
private final ArrayList<String> otherRegionServers = new ArrayList<>();
|
private final List<ServerName> otherRegionServers = new ArrayList<>();
|
||||||
|
|
||||||
public ReplicationTrackerZKImpl(ZKWatcher zookeeper, Abortable abortable, Stoppable stopper) {
|
public ReplicationTrackerZKImpl(ZKWatcher zookeeper, Abortable abortable, Stoppable stopper) {
|
||||||
this.zookeeper = zookeeper;
|
this.zookeeper = zookeeper;
|
||||||
|
@ -74,10 +77,10 @@ public class ReplicationTrackerZKImpl implements ReplicationTracker {
|
||||||
* Return a snapshot of the current region servers.
|
* Return a snapshot of the current region servers.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public List<String> getListOfRegionServers() {
|
public List<ServerName> getListOfRegionServers() {
|
||||||
refreshOtherRegionServersList(false);
|
refreshOtherRegionServersList(false);
|
||||||
|
|
||||||
List<String> list = null;
|
List<ServerName> list = null;
|
||||||
synchronized (otherRegionServers) {
|
synchronized (otherRegionServers) {
|
||||||
list = new ArrayList<>(otherRegionServers);
|
list = new ArrayList<>(otherRegionServers);
|
||||||
}
|
}
|
||||||
|
@ -162,7 +165,7 @@ public class ReplicationTrackerZKImpl implements ReplicationTracker {
|
||||||
* if it was empty), false if the data was missing in ZK
|
* if it was empty), false if the data was missing in ZK
|
||||||
*/
|
*/
|
||||||
private boolean refreshOtherRegionServersList(boolean watch) {
|
private boolean refreshOtherRegionServersList(boolean watch) {
|
||||||
List<String> newRsList = getRegisteredRegionServers(watch);
|
List<ServerName> newRsList = getRegisteredRegionServers(watch);
|
||||||
if (newRsList == null) {
|
if (newRsList == null) {
|
||||||
return false;
|
return false;
|
||||||
} else {
|
} else {
|
||||||
|
@ -178,7 +181,7 @@ public class ReplicationTrackerZKImpl implements ReplicationTracker {
|
||||||
* Get a list of all the other region servers in this cluster and set a watch
|
* Get a list of all the other region servers in this cluster and set a watch
|
||||||
* @return a list of server nanes
|
* @return a list of server nanes
|
||||||
*/
|
*/
|
||||||
private List<String> getRegisteredRegionServers(boolean watch) {
|
private List<ServerName> getRegisteredRegionServers(boolean watch) {
|
||||||
List<String> result = null;
|
List<String> result = null;
|
||||||
try {
|
try {
|
||||||
if (watch) {
|
if (watch) {
|
||||||
|
@ -190,6 +193,7 @@ public class ReplicationTrackerZKImpl implements ReplicationTracker {
|
||||||
} catch (KeeperException e) {
|
} catch (KeeperException e) {
|
||||||
this.abortable.abort("Get list of registered region servers", e);
|
this.abortable.abort("Get list of registered region servers", e);
|
||||||
}
|
}
|
||||||
return result;
|
return result == null ? null :
|
||||||
|
result.stream().map(ServerName::parseServerName).collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -311,7 +311,7 @@ public class DumpReplicationQueues extends Configured implements Tool {
|
||||||
queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf());
|
queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf());
|
||||||
replicationTracker = ReplicationFactory.getReplicationTracker(zkw, new WarnOnlyAbortable(),
|
replicationTracker = ReplicationFactory.getReplicationTracker(zkw, new WarnOnlyAbortable(),
|
||||||
new WarnOnlyStoppable());
|
new WarnOnlyStoppable());
|
||||||
Set<String> liveRegionServers = new HashSet<>(replicationTracker.getListOfRegionServers());
|
Set<ServerName> liveRegionServers = new HashSet<>(replicationTracker.getListOfRegionServers());
|
||||||
|
|
||||||
// Loops each peer on each RS and dumps the queues
|
// Loops each peer on each RS and dumps the queues
|
||||||
List<ServerName> regionservers = queueStorage.getListOfReplicators();
|
List<ServerName> regionservers = queueStorage.getListOfReplicators();
|
||||||
|
@ -320,7 +320,7 @@ public class DumpReplicationQueues extends Configured implements Tool {
|
||||||
}
|
}
|
||||||
for (ServerName regionserver : regionservers) {
|
for (ServerName regionserver : regionservers) {
|
||||||
List<String> queueIds = queueStorage.getAllQueues(regionserver);
|
List<String> queueIds = queueStorage.getAllQueues(regionserver);
|
||||||
if (!liveRegionServers.contains(regionserver.getServerName())) {
|
if (!liveRegionServers.contains(regionserver)) {
|
||||||
deadRegionServers.add(regionserver.getServerName());
|
deadRegionServers.add(regionserver.getServerName());
|
||||||
}
|
}
|
||||||
for (String queueId : queueIds) {
|
for (String queueId : queueIds) {
|
||||||
|
|
|
@ -266,8 +266,7 @@ public class ReplicationSourceManager implements ReplicationListener {
|
||||||
if (currentReplicators == null || currentReplicators.isEmpty()) {
|
if (currentReplicators == null || currentReplicators.isEmpty()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
List<ServerName> otherRegionServers = replicationTracker.getListOfRegionServers().stream()
|
List<ServerName> otherRegionServers = replicationTracker.getListOfRegionServers();
|
||||||
.map(ServerName::valueOf).collect(Collectors.toList());
|
|
||||||
LOG.info(
|
LOG.info(
|
||||||
"Current list of replicators: " + currentReplicators + " other RSs: " + otherRegionServers);
|
"Current list of replicators: " + currentReplicators + " other RSs: " + otherRegionServers);
|
||||||
|
|
||||||
|
|
|
@ -115,26 +115,26 @@ public class TestReplicationTrackerZKImpl {
|
||||||
assertEquals(0, rt.getListOfRegionServers().size());
|
assertEquals(0, rt.getListOfRegionServers().size());
|
||||||
|
|
||||||
// 1 region server
|
// 1 region server
|
||||||
ZKUtil.createWithParents(zkw,
|
ZKUtil.createWithParents(zkw, ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode,
|
||||||
ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode, "hostname1.example.org:1234"));
|
"hostname1.example.org,1234,1611218678009"));
|
||||||
List<String> rss = rt.getListOfRegionServers();
|
List<ServerName> rss = rt.getListOfRegionServers();
|
||||||
assertEquals(rss.toString(), 1, rss.size());
|
assertEquals(rss.toString(), 1, rss.size());
|
||||||
|
|
||||||
// 2 region servers
|
// 2 region servers
|
||||||
ZKUtil.createWithParents(zkw,
|
ZKUtil.createWithParents(zkw, ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode,
|
||||||
ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode, "hostname2.example.org:1234"));
|
"hostname2.example.org,1234,1611218678009"));
|
||||||
rss = rt.getListOfRegionServers();
|
rss = rt.getListOfRegionServers();
|
||||||
assertEquals(rss.toString(), 2, rss.size());
|
assertEquals(rss.toString(), 2, rss.size());
|
||||||
|
|
||||||
// 1 region server
|
// 1 region server
|
||||||
ZKUtil.deleteNode(zkw,
|
ZKUtil.deleteNode(zkw, ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode,
|
||||||
ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode, "hostname2.example.org:1234"));
|
"hostname2.example.org,1234,1611218678009"));
|
||||||
rss = rt.getListOfRegionServers();
|
rss = rt.getListOfRegionServers();
|
||||||
assertEquals(1, rss.size());
|
assertEquals(1, rss.size());
|
||||||
|
|
||||||
// 0 region server
|
// 0 region server
|
||||||
ZKUtil.deleteNode(zkw,
|
ZKUtil.deleteNode(zkw, ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode,
|
||||||
ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode, "hostname1.example.org:1234"));
|
"hostname1.example.org,1234,1611218678009"));
|
||||||
rss = rt.getListOfRegionServers();
|
rss = rt.getListOfRegionServers();
|
||||||
assertEquals(rss.toString(), 0, rss.size());
|
assertEquals(rss.toString(), 0, rss.size());
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue