HBASE-22987 Calculate the region servers in default group in foreground (#599)
Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
parent
2bf2781df9
commit
0e29d62a2d
|
@ -30,6 +30,7 @@ import java.util.OptionalLong;
|
|||
import java.util.Set;
|
||||
import java.util.SortedSet;
|
||||
import java.util.TreeSet;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.hadoop.hbase.Coprocessor;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
||||
|
@ -174,8 +175,6 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
|
|||
private final RSGroupStartupWorker rsGroupStartupWorker;
|
||||
// contains list of groups that were last flushed to persistent store
|
||||
private Set<String> prevRSGroups = new HashSet<>();
|
||||
private final ServerEventsListenerThread serverEventsListenerThread =
|
||||
new ServerEventsListenerThread();
|
||||
|
||||
private RSGroupInfoManagerImpl(MasterServices masterServices) throws IOException {
|
||||
this.masterServices = masterServices;
|
||||
|
@ -184,11 +183,34 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
|
|||
this.rsGroupStartupWorker = new RSGroupStartupWorker();
|
||||
}
|
||||
|
||||
private synchronized void updateDefaultServers() {
|
||||
LOG.info("Updating default servers.");
|
||||
Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(holder.groupName2Group);
|
||||
RSGroupInfo oldDefaultGroupInfo = getRSGroup(RSGroupInfo.DEFAULT_GROUP);
|
||||
assert oldDefaultGroupInfo != null;
|
||||
RSGroupInfo newDefaultGroupInfo =
|
||||
new RSGroupInfo(RSGroupInfo.DEFAULT_GROUP, getDefaultServers());
|
||||
newDefaultGroupInfo.addAllTables(oldDefaultGroupInfo.getTables());
|
||||
newGroupMap.put(RSGroupInfo.DEFAULT_GROUP, newDefaultGroupInfo);
|
||||
// do not need to persist, as we do not persist default group.
|
||||
resetRSGroupMap(newGroupMap);
|
||||
LOG.info("Updated default servers, {} servers", newDefaultGroupInfo.getServers().size());
|
||||
}
|
||||
|
||||
private synchronized void init() throws IOException {
|
||||
refresh(false);
|
||||
serverEventsListenerThread.start();
|
||||
masterServices.getServerManager().registerListener(serverEventsListenerThread);
|
||||
masterServices.getServerManager().registerListener(new ServerListener() {
|
||||
|
||||
@Override
|
||||
public void serverAdded(ServerName serverName) {
|
||||
updateDefaultServers();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void serverRemoved(ServerName serverName) {
|
||||
updateDefaultServers();
|
||||
}
|
||||
});
|
||||
migrate();
|
||||
}
|
||||
|
||||
|
@ -225,19 +247,11 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
|
|||
}
|
||||
|
||||
/**
|
||||
* @param master the master to get online servers for
|
||||
* @return Set of online Servers named for their hostname and port (not ServerName).
|
||||
*/
|
||||
private static Set<Address> getOnlineServers(final MasterServices master) {
|
||||
Set<Address> onlineServers = new HashSet<Address>();
|
||||
if (master == null) {
|
||||
return onlineServers;
|
||||
}
|
||||
|
||||
for (ServerName server : master.getServerManager().getOnlineServers().keySet()) {
|
||||
onlineServers.add(server.getAddress());
|
||||
}
|
||||
return onlineServers;
|
||||
private Set<Address> getOnlineServers() {
|
||||
return masterServices.getServerManager().getOnlineServers().keySet().stream()
|
||||
.map(ServerName::getAddress).collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -249,8 +263,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
|
|||
// it. If not 'default' group, add server to 'dst' rsgroup EVEN IF IT IS NOT online (could be a
|
||||
// rsgroup of dead servers that are to come back later).
|
||||
Set<Address> onlineServers =
|
||||
dst.getName().equals(RSGroupInfo.DEFAULT_GROUP) ? getOnlineServers(this.masterServices)
|
||||
: null;
|
||||
dst.getName().equals(RSGroupInfo.DEFAULT_GROUP) ? getOnlineServers() : null;
|
||||
for (Address el : servers) {
|
||||
src.removeServer(el);
|
||||
if (onlineServers != null) {
|
||||
|
@ -617,25 +630,8 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
|
|||
this.prevRSGroups.addAll(currentGroups);
|
||||
}
|
||||
|
||||
// Called by getDefaultServers. Presume it has lock in place.
|
||||
private List<ServerName> getOnlineRS() throws IOException {
|
||||
if (masterServices != null) {
|
||||
return masterServices.getServerManager().getOnlineServersList();
|
||||
}
|
||||
LOG.debug("Reading online RS from zookeeper");
|
||||
List<ServerName> servers = new ArrayList<>();
|
||||
try {
|
||||
for (String el : ZKUtil.listChildrenNoWatch(watcher, watcher.getZNodePaths().rsZNode)) {
|
||||
servers.add(ServerName.parseServerName(el));
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
throw new IOException("Failed to retrieve server list from zookeeper", e);
|
||||
}
|
||||
return servers;
|
||||
}
|
||||
|
||||
// Called by ServerEventsListenerThread. Presume it has lock on this manager when it runs.
|
||||
private SortedSet<Address> getDefaultServers() throws IOException {
|
||||
private SortedSet<Address> getDefaultServers() {
|
||||
// Build a list of servers in other groups than default group, from rsGroupMap
|
||||
Set<Address> serversInOtherGroup = new HashSet<>();
|
||||
for (RSGroupInfo group : listRSGroups() /* get from rsGroupMap */) {
|
||||
|
@ -646,7 +642,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
|
|||
|
||||
// Get all online servers from Zookeeper and find out servers in default group
|
||||
SortedSet<Address> defaultServers = Sets.newTreeSet();
|
||||
for (ServerName serverName : getOnlineRS()) {
|
||||
for (ServerName serverName : masterServices.getServerManager().getOnlineServers().keySet()) {
|
||||
Address server = Address.fromParts(serverName.getHostname(), serverName.getPort());
|
||||
if (!serversInOtherGroup.contains(server)) { // not in other groups
|
||||
defaultServers.add(server);
|
||||
|
@ -655,76 +651,6 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
|
|||
return defaultServers;
|
||||
}
|
||||
|
||||
// Called by ServerEventsListenerThread. Synchronize on this because redoing
|
||||
// the rsGroupMap then writing it out.
|
||||
private synchronized void updateDefaultServers(SortedSet<Address> servers) {
|
||||
Map<String, RSGroupInfo> rsGroupMap = holder.groupName2Group;
|
||||
RSGroupInfo info = rsGroupMap.get(RSGroupInfo.DEFAULT_GROUP);
|
||||
RSGroupInfo newInfo = new RSGroupInfo(info.getName(), servers);
|
||||
HashMap<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
|
||||
newGroupMap.put(newInfo.getName(), newInfo);
|
||||
resetRSGroupMap(newGroupMap);
|
||||
}
|
||||
|
||||
/**
|
||||
* Calls {@link RSGroupInfoManagerImpl#updateDefaultServers(SortedSet)} to update list of known
|
||||
* servers. Notifications about server changes are received by registering {@link ServerListener}.
|
||||
* As a listener, we need to return immediately, so the real work of updating the servers is done
|
||||
* asynchronously in this thread.
|
||||
*/
|
||||
private class ServerEventsListenerThread extends Thread implements ServerListener {
|
||||
private final Logger LOG = LoggerFactory.getLogger(ServerEventsListenerThread.class);
|
||||
private boolean changed = false;
|
||||
|
||||
ServerEventsListenerThread() {
|
||||
setDaemon(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void serverAdded(ServerName serverName) {
|
||||
serverChanged();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void serverRemoved(ServerName serverName) {
|
||||
serverChanged();
|
||||
}
|
||||
|
||||
private synchronized void serverChanged() {
|
||||
changed = true;
|
||||
this.notify();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
setName(ServerEventsListenerThread.class.getName() + "-" + masterServices.getServerName());
|
||||
SortedSet<Address> prevDefaultServers = new TreeSet<>();
|
||||
while (isMasterRunning(masterServices)) {
|
||||
try {
|
||||
LOG.info("Updating default servers.");
|
||||
SortedSet<Address> servers = RSGroupInfoManagerImpl.this.getDefaultServers();
|
||||
if (!servers.equals(prevDefaultServers)) {
|
||||
RSGroupInfoManagerImpl.this.updateDefaultServers(servers);
|
||||
prevDefaultServers = servers;
|
||||
LOG.info("Updated with servers: " + servers.size());
|
||||
}
|
||||
try {
|
||||
synchronized (this) {
|
||||
while (!changed) {
|
||||
wait();
|
||||
}
|
||||
changed = false;
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Interrupted", e);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to update default servers", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private class RSGroupStartupWorker extends Thread {
|
||||
private final Logger LOG = LoggerFactory.getLogger(RSGroupStartupWorker.class);
|
||||
private volatile boolean online = false;
|
||||
|
|
Loading…
Reference in New Issue