From 0e29d62a2d46245adea8ac36e61f96f842b3bf77 Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Wed, 11 Sep 2019 22:10:52 +0800 Subject: [PATCH] HBASE-22987 Calculate the region servers in default group in foreground (#599) Signed-off-by: Guanghao Zhang --- .../hbase/rsgroup/RSGroupInfoManagerImpl.java | 138 ++++-------------- 1 file changed, 32 insertions(+), 106 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java index 67250669ea9..7224869192b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java @@ -30,6 +30,7 @@ import java.util.OptionalLong; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; +import java.util.stream.Collectors; import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.NamespaceDescriptor; @@ -174,8 +175,6 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager { private final RSGroupStartupWorker rsGroupStartupWorker; // contains list of groups that were last flushed to persistent store private Set prevRSGroups = new HashSet<>(); - private final ServerEventsListenerThread serverEventsListenerThread = - new ServerEventsListenerThread(); private RSGroupInfoManagerImpl(MasterServices masterServices) throws IOException { this.masterServices = masterServices; @@ -184,11 +183,34 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager { this.rsGroupStartupWorker = new RSGroupStartupWorker(); } + private synchronized void updateDefaultServers() { + LOG.info("Updating default servers."); + Map newGroupMap = Maps.newHashMap(holder.groupName2Group); + RSGroupInfo oldDefaultGroupInfo = getRSGroup(RSGroupInfo.DEFAULT_GROUP); + assert oldDefaultGroupInfo != null; + RSGroupInfo newDefaultGroupInfo = + new RSGroupInfo(RSGroupInfo.DEFAULT_GROUP, getDefaultServers()); + newDefaultGroupInfo.addAllTables(oldDefaultGroupInfo.getTables()); + newGroupMap.put(RSGroupInfo.DEFAULT_GROUP, newDefaultGroupInfo); + // do not need to persist, as we do not persist default group. + resetRSGroupMap(newGroupMap); + LOG.info("Updated default servers, {} servers", newDefaultGroupInfo.getServers().size()); + } private synchronized void init() throws IOException { refresh(false); - serverEventsListenerThread.start(); - masterServices.getServerManager().registerListener(serverEventsListenerThread); + masterServices.getServerManager().registerListener(new ServerListener() { + + @Override + public void serverAdded(ServerName serverName) { + updateDefaultServers(); + } + + @Override + public void serverRemoved(ServerName serverName) { + updateDefaultServers(); + } + }); migrate(); } @@ -225,19 +247,11 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager { } /** - * @param master the master to get online servers for * @return Set of online Servers named for their hostname and port (not ServerName). */ - private static Set
getOnlineServers(final MasterServices master) { - Set
onlineServers = new HashSet
(); - if (master == null) { - return onlineServers; - } - - for (ServerName server : master.getServerManager().getOnlineServers().keySet()) { - onlineServers.add(server.getAddress()); - } - return onlineServers; + private Set
getOnlineServers() { + return masterServices.getServerManager().getOnlineServers().keySet().stream() + .map(ServerName::getAddress).collect(Collectors.toSet()); } @Override @@ -249,8 +263,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager { // it. If not 'default' group, add server to 'dst' rsgroup EVEN IF IT IS NOT online (could be a // rsgroup of dead servers that are to come back later). Set
onlineServers = - dst.getName().equals(RSGroupInfo.DEFAULT_GROUP) ? getOnlineServers(this.masterServices) - : null; + dst.getName().equals(RSGroupInfo.DEFAULT_GROUP) ? getOnlineServers() : null; for (Address el : servers) { src.removeServer(el); if (onlineServers != null) { @@ -617,25 +630,8 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager { this.prevRSGroups.addAll(currentGroups); } - // Called by getDefaultServers. Presume it has lock in place. - private List getOnlineRS() throws IOException { - if (masterServices != null) { - return masterServices.getServerManager().getOnlineServersList(); - } - LOG.debug("Reading online RS from zookeeper"); - List servers = new ArrayList<>(); - try { - for (String el : ZKUtil.listChildrenNoWatch(watcher, watcher.getZNodePaths().rsZNode)) { - servers.add(ServerName.parseServerName(el)); - } - } catch (KeeperException e) { - throw new IOException("Failed to retrieve server list from zookeeper", e); - } - return servers; - } - // Called by ServerEventsListenerThread. Presume it has lock on this manager when it runs. - private SortedSet
getDefaultServers() throws IOException { + private SortedSet
getDefaultServers() { // Build a list of servers in other groups than default group, from rsGroupMap Set
serversInOtherGroup = new HashSet<>(); for (RSGroupInfo group : listRSGroups() /* get from rsGroupMap */) { @@ -646,7 +642,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager { // Get all online servers from Zookeeper and find out servers in default group SortedSet
defaultServers = Sets.newTreeSet(); - for (ServerName serverName : getOnlineRS()) { + for (ServerName serverName : masterServices.getServerManager().getOnlineServers().keySet()) { Address server = Address.fromParts(serverName.getHostname(), serverName.getPort()); if (!serversInOtherGroup.contains(server)) { // not in other groups defaultServers.add(server); @@ -655,76 +651,6 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager { return defaultServers; } - // Called by ServerEventsListenerThread. Synchronize on this because redoing - // the rsGroupMap then writing it out. - private synchronized void updateDefaultServers(SortedSet
servers) { - Map rsGroupMap = holder.groupName2Group; - RSGroupInfo info = rsGroupMap.get(RSGroupInfo.DEFAULT_GROUP); - RSGroupInfo newInfo = new RSGroupInfo(info.getName(), servers); - HashMap newGroupMap = Maps.newHashMap(rsGroupMap); - newGroupMap.put(newInfo.getName(), newInfo); - resetRSGroupMap(newGroupMap); - } - - /** - * Calls {@link RSGroupInfoManagerImpl#updateDefaultServers(SortedSet)} to update list of known - * servers. Notifications about server changes are received by registering {@link ServerListener}. - * As a listener, we need to return immediately, so the real work of updating the servers is done - * asynchronously in this thread. - */ - private class ServerEventsListenerThread extends Thread implements ServerListener { - private final Logger LOG = LoggerFactory.getLogger(ServerEventsListenerThread.class); - private boolean changed = false; - - ServerEventsListenerThread() { - setDaemon(true); - } - - @Override - public void serverAdded(ServerName serverName) { - serverChanged(); - } - - @Override - public void serverRemoved(ServerName serverName) { - serverChanged(); - } - - private synchronized void serverChanged() { - changed = true; - this.notify(); - } - - @Override - public void run() { - setName(ServerEventsListenerThread.class.getName() + "-" + masterServices.getServerName()); - SortedSet
prevDefaultServers = new TreeSet<>(); - while (isMasterRunning(masterServices)) { - try { - LOG.info("Updating default servers."); - SortedSet
servers = RSGroupInfoManagerImpl.this.getDefaultServers(); - if (!servers.equals(prevDefaultServers)) { - RSGroupInfoManagerImpl.this.updateDefaultServers(servers); - prevDefaultServers = servers; - LOG.info("Updated with servers: " + servers.size()); - } - try { - synchronized (this) { - while (!changed) { - wait(); - } - changed = false; - } - } catch (InterruptedException e) { - LOG.warn("Interrupted", e); - } - } catch (IOException e) { - LOG.warn("Failed to update default servers", e); - } - } - } - } - private class RSGroupStartupWorker extends Thread { private final Logger LOG = LoggerFactory.getLogger(RSGroupStartupWorker.class); private volatile boolean online = false;