HBASE-26235 We could start RegionServerTracker before becoming active master (#3645)
Signed-off-by: Yulin Niu <niuyulin@apache.org>
This commit is contained in:
parent
c4daabd9c5
commit
889049eab6
@ -22,6 +22,7 @@ import static org.apache.hadoop.hbase.HConstants.SPLIT_LOGDIR_NAME;
|
||||
import static org.apache.hadoop.hbase.HConstants.ZOOKEEPER_ZNODE_PARENT;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
@ -203,6 +204,10 @@ public class ZNodePaths {
|
||||
path.equals(tableZNode) || path.startsWith(tableZNode + "/");
|
||||
}
|
||||
|
||||
public String getRsPath(ServerName sn) {
|
||||
return joinZNode(rsZNode, sn.toString());
|
||||
}
|
||||
|
||||
/**
|
||||
* Join the prefix znode name with the suffix znode name to generate a proper full znode name.
|
||||
* <p>
|
||||
|
@ -17,6 +17,7 @@
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
@ -47,7 +48,7 @@ public interface ConnectionRegistryEndpoint {
|
||||
/**
|
||||
* Get all the region servers address.
|
||||
*/
|
||||
List<ServerName> getRegionServers();
|
||||
Collection<ServerName> getRegionServers();
|
||||
|
||||
/**
|
||||
* Get the location of meta regions.
|
||||
|
@ -275,7 +275,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||
// Manager and zk listener for master election
|
||||
private final ActiveMasterManager activeMasterManager;
|
||||
// Region server tracker
|
||||
private RegionServerTracker regionServerTracker;
|
||||
private final RegionServerTracker regionServerTracker;
|
||||
// Draining region server tracker
|
||||
private DrainingServerTracker drainingServerTracker;
|
||||
// Tracker for load balancer state
|
||||
@ -489,6 +489,8 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||
this.activeMasterManager = createActiveMasterManager(zooKeeper, serverName, this);
|
||||
|
||||
cachedClusterId = new CachedClusterId(this, conf);
|
||||
|
||||
this.regionServerTracker = new RegionServerTracker(zooKeeper, this);
|
||||
} catch (Throwable t) {
|
||||
// Make sure we log the exception. HMaster is often started via reflection and the
|
||||
// cause of failed startup is lost.
|
||||
@ -928,8 +930,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||
// filesystem that COULD BE 'alive' (we'll schedule SCPs for each and let SCP figure it out).
|
||||
// We also pass dirs that are already 'splitting'... so we can do some checks down in tracker.
|
||||
// TODO: Generate the splitting and live Set in one pass instead of two as we currently do.
|
||||
this.regionServerTracker = new RegionServerTracker(zooKeeper, this, this.serverManager);
|
||||
this.regionServerTracker.start(
|
||||
this.regionServerTracker.upgrade(
|
||||
procsByType.getOrDefault(ServerCrashProcedure.class, Collections.emptyList()).stream()
|
||||
.map(p -> (ServerCrashProcedure) p).map(p -> p.getServerName()).collect(Collectors.toSet()),
|
||||
walManager.getLiveServersFromWALDir(), walManager.getSplittingServersFromWALDir());
|
||||
@ -2726,8 +2727,8 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ServerName> getRegionServers() {
|
||||
return serverManager.getOnlineServersList();
|
||||
public Collection<ServerName> getRegionServers() {
|
||||
return regionServerTracker.getRegionServers();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -20,8 +20,6 @@ package org.apache.hadoop.hbase.master;
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
@ -31,18 +29,17 @@ import org.apache.hadoop.hbase.ServerMetrics;
|
||||
import org.apache.hadoop.hbase.ServerMetricsBuilder;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.client.VersionInfoUtil;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKListener;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
|
||||
import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
|
||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServerInfo;
|
||||
@ -61,27 +58,27 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServe
|
||||
@InterfaceAudience.Private
|
||||
public class RegionServerTracker extends ZKListener {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(RegionServerTracker.class);
|
||||
private final Set<ServerName> regionServers = new HashSet<>();
|
||||
private final ServerManager serverManager;
|
||||
// indicate whether we are active master
|
||||
private boolean active;
|
||||
private volatile Set<ServerName> regionServers = Collections.emptySet();
|
||||
private final MasterServices server;
|
||||
// As we need to send request to zk when processing the nodeChildrenChanged event, we'd better
|
||||
// move the operation to a single threaded thread pool in order to not block the zk event
|
||||
// processing since all the zk listener across HMaster will be called in one thread sequentially.
|
||||
private final ExecutorService executor;
|
||||
|
||||
public RegionServerTracker(ZKWatcher watcher, MasterServices server,
|
||||
ServerManager serverManager) {
|
||||
public RegionServerTracker(ZKWatcher watcher, MasterServices server) {
|
||||
super(watcher);
|
||||
this.server = server;
|
||||
this.serverManager = serverManager;
|
||||
this.executor = Executors.newSingleThreadExecutor(
|
||||
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("RegionServerTracker-%d").build());
|
||||
watcher.registerListener(this);
|
||||
refresh();
|
||||
}
|
||||
|
||||
private Pair<ServerName, RegionServerInfo> getServerInfo(String name)
|
||||
throws KeeperException, IOException {
|
||||
ServerName serverName = ServerName.parseServerName(name);
|
||||
String nodePath = ZNodePaths.joinZNode(watcher.getZNodePaths().rsZNode, name);
|
||||
private RegionServerInfo getServerInfo(ServerName serverName)
|
||||
throws KeeperException, IOException {
|
||||
String nodePath = watcher.getZNodePaths().getRsPath(serverName);
|
||||
byte[] data;
|
||||
try {
|
||||
data = ZKUtil.getData(watcher, nodePath);
|
||||
@ -91,24 +88,26 @@ public class RegionServerTracker extends ZKListener {
|
||||
if (data == null) {
|
||||
// we should receive a children changed event later and then we will expire it, so we still
|
||||
// need to add it to the region server set.
|
||||
LOG.warn("Server node {} does not exist, already dead?", name);
|
||||
return Pair.newPair(serverName, null);
|
||||
LOG.warn("Server node {} does not exist, already dead?", serverName);
|
||||
return null;
|
||||
}
|
||||
if (data.length == 0 || !ProtobufUtil.isPBMagicPrefix(data)) {
|
||||
// this should not happen actually, unless we have bugs or someone has messed zk up.
|
||||
LOG.warn("Invalid data for region server node {} on zookeeper, data length = {}", name,
|
||||
LOG.warn("Invalid data for region server node {} on zookeeper, data length = {}", serverName,
|
||||
data.length);
|
||||
return Pair.newPair(serverName, null);
|
||||
return null;
|
||||
}
|
||||
RegionServerInfo.Builder builder = RegionServerInfo.newBuilder();
|
||||
int magicLen = ProtobufUtil.lengthOfPBMagic();
|
||||
ProtobufUtil.mergeFrom(builder, data, magicLen, data.length - magicLen);
|
||||
return Pair.newPair(serverName, builder.build());
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts the tracking of online RegionServers. All RSes will be tracked after this method is
|
||||
* called.
|
||||
* Upgrade to active master mode, where besides tracking the changes of region server set, we will
|
||||
* also started to add new region servers to ServerManager and also schedule SCP if a region
|
||||
* server dies. Starts the tracking of online RegionServers. All RSes will be tracked after this
|
||||
* method is called.
|
||||
* <p/>
|
||||
* In this method, we will also construct the region server sets in {@link ServerManager}. If a
|
||||
* region server is dead between the crash of the previous master instance and the start of the
|
||||
@ -119,38 +118,32 @@ public class RegionServerTracker extends ZKListener {
|
||||
* @param liveServersFromWALDir the live region servers from wal directory.
|
||||
* @param splittingServersFromWALDir Servers whose WALs are being actively 'split'.
|
||||
*/
|
||||
public void start(Set<ServerName> deadServersFromPE, Set<ServerName> liveServersFromWALDir,
|
||||
Set<ServerName> splittingServersFromWALDir)
|
||||
throws KeeperException, IOException {
|
||||
LOG.info("Starting RegionServerTracker; {} have existing ServerCrashProcedures, {} " +
|
||||
"possibly 'live' servers, and {} 'splitting'.", deadServersFromPE.size(),
|
||||
liveServersFromWALDir.size(), splittingServersFromWALDir.size());
|
||||
public void upgrade(Set<ServerName> deadServersFromPE, Set<ServerName> liveServersFromWALDir,
|
||||
Set<ServerName> splittingServersFromWALDir) throws KeeperException, IOException {
|
||||
LOG.info(
|
||||
"Upgrading RegionServerTracker to active master mode; {} have existing" +
|
||||
"ServerCrashProcedures, {} possibly 'live' servers, and {} 'splitting'.",
|
||||
deadServersFromPE.size(), liveServersFromWALDir.size(), splittingServersFromWALDir.size());
|
||||
// deadServersFromPE is made from a list of outstanding ServerCrashProcedures.
|
||||
// splittingServersFromWALDir are being actively split -- the directory in the FS ends in
|
||||
// '-SPLITTING'. Each splitting server should have a corresponding SCP. Log if not.
|
||||
splittingServersFromWALDir.stream().filter(s -> !deadServersFromPE.contains(s)).
|
||||
forEach(s -> LOG.error("{} has no matching ServerCrashProcedure", s));
|
||||
//create ServerNode for all possible live servers from wal directory
|
||||
// create ServerNode for all possible live servers from wal directory
|
||||
liveServersFromWALDir
|
||||
.forEach(sn -> server.getAssignmentManager().getRegionStates().getOrCreateServer(sn));
|
||||
watcher.registerListener(this);
|
||||
ServerManager serverManager = server.getServerManager();
|
||||
synchronized (this) {
|
||||
List<String> servers =
|
||||
ZKUtil.listChildrenAndWatchForNewChildren(watcher, watcher.getZNodePaths().rsZNode);
|
||||
if (null != servers) {
|
||||
for (String n : servers) {
|
||||
Pair<ServerName, RegionServerInfo> pair = getServerInfo(n);
|
||||
ServerName serverName = pair.getFirst();
|
||||
RegionServerInfo info = pair.getSecond();
|
||||
regionServers.add(serverName);
|
||||
ServerMetrics serverMetrics = info != null ?
|
||||
ServerMetricsBuilder.of(serverName, VersionInfoUtil.getVersionNumber(info.getVersionInfo()),
|
||||
info.getVersionInfo().getVersion()) :
|
||||
ServerMetricsBuilder.of(serverName);
|
||||
serverManager.checkAndRecordNewServer(serverName, serverMetrics);
|
||||
}
|
||||
Set<ServerName> liveServers = regionServers;
|
||||
for (ServerName serverName : liveServers) {
|
||||
RegionServerInfo info = getServerInfo(serverName);
|
||||
ServerMetrics serverMetrics = info != null ? ServerMetricsBuilder.of(serverName,
|
||||
VersionInfoUtil.getVersionNumber(info.getVersionInfo()),
|
||||
info.getVersionInfo().getVersion()) : ServerMetricsBuilder.of(serverName);
|
||||
serverManager.checkAndRecordNewServer(serverName, serverMetrics);
|
||||
}
|
||||
serverManager.findDeadServersAndProcess(deadServersFromPE, liveServersFromWALDir);
|
||||
active = true;
|
||||
}
|
||||
}
|
||||
|
||||
@ -158,32 +151,24 @@ public class RegionServerTracker extends ZKListener {
|
||||
executor.shutdownNow();
|
||||
}
|
||||
|
||||
private synchronized void refresh() {
|
||||
List<String> names;
|
||||
try {
|
||||
names = ZKUtil.listChildrenAndWatchForNewChildren(watcher, watcher.getZNodePaths().rsZNode);
|
||||
} catch (KeeperException e) {
|
||||
// here we need to abort as we failed to set watcher on the rs node which means that we can
|
||||
// not track the node deleted evetnt any more.
|
||||
server.abort("Unexpected zk exception getting RS nodes", e);
|
||||
return;
|
||||
}
|
||||
Set<ServerName> servers = CollectionUtils.isEmpty(names) ? Collections.emptySet() :
|
||||
names.stream().map(ServerName::parseServerName).collect(Collectors.toSet());
|
||||
public Set<ServerName> getRegionServers() {
|
||||
return regionServers;
|
||||
}
|
||||
|
||||
for (Iterator<ServerName> iter = regionServers.iterator(); iter.hasNext();) {
|
||||
ServerName sn = iter.next();
|
||||
if (!servers.contains(sn)) {
|
||||
LOG.info("RegionServer ephemeral node deleted, processing expiration [{}]", sn);
|
||||
serverManager.expireServer(sn);
|
||||
iter.remove();
|
||||
}
|
||||
// execute the operations which are only needed for active masters, such as expire old servers,
|
||||
// add new servers, etc.
|
||||
private void processAsActiveMaster(Set<ServerName> newServers) {
|
||||
Set<ServerName> oldServers = regionServers;
|
||||
ServerManager serverManager = server.getServerManager();
|
||||
// expire dead servers
|
||||
for (ServerName crashedServer : Sets.difference(oldServers, newServers)) {
|
||||
LOG.info("RegionServer ephemeral node deleted, processing expiration [{}]", crashedServer);
|
||||
serverManager.expireServer(crashedServer);
|
||||
}
|
||||
// here we do not need to parse the region server info as it is useless now, we only need the
|
||||
// server name.
|
||||
// check whether there are new servers, log them
|
||||
boolean newServerAdded = false;
|
||||
for (ServerName sn : servers) {
|
||||
if (regionServers.add(sn)) {
|
||||
for (ServerName sn : newServers) {
|
||||
if (!oldServers.contains(sn)) {
|
||||
newServerAdded = true;
|
||||
LOG.info("RegionServer ephemeral node created, adding [" + sn + "]");
|
||||
}
|
||||
@ -195,6 +180,25 @@ public class RegionServerTracker extends ZKListener {
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void refresh() {
|
||||
List<String> names;
|
||||
try {
|
||||
names = ZKUtil.listChildrenAndWatchForNewChildren(watcher, watcher.getZNodePaths().rsZNode);
|
||||
} catch (KeeperException e) {
|
||||
// here we need to abort as we failed to set watcher on the rs node which means that we can
|
||||
// not track the node deleted event any more.
|
||||
server.abort("Unexpected zk exception getting RS nodes", e);
|
||||
return;
|
||||
}
|
||||
Set<ServerName> newServers = CollectionUtils.isEmpty(names) ? Collections.emptySet() :
|
||||
names.stream().map(ServerName::parseServerName)
|
||||
.collect(Collectors.collectingAndThen(Collectors.toSet(), Collections::unmodifiableSet));
|
||||
if (active) {
|
||||
processAsActiveMaster(newServers);
|
||||
}
|
||||
this.regionServers = newServers;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void nodeChildrenChanged(String path) {
|
||||
if (path.equals(watcher.getZNodePaths().rsZNode) && !server.isAborted() &&
|
||||
|
@ -188,7 +188,6 @@ import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKNodeTracker;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
@ -686,7 +685,12 @@ public class HRegionServer extends Thread implements RegionServerServices, LastS
|
||||
}
|
||||
this.rpcServices.start(zooKeeper);
|
||||
this.metaRegionLocationCache = new MetaRegionLocationCache(zooKeeper);
|
||||
this.regionServerAddressTracker = new RegionServerAddressTracker(zooKeeper, this);
|
||||
if (!(this instanceof HMaster)) {
|
||||
// do not create this field for HMaster, we have another region server tracker for HMaster.
|
||||
this.regionServerAddressTracker = new RegionServerAddressTracker(zooKeeper, this);
|
||||
} else {
|
||||
this.regionServerAddressTracker = null;
|
||||
}
|
||||
// This violates 'no starting stuff in Constructor' but Master depends on the below chore
|
||||
// and executor being created and takes a different startup route. Lots of overlap between HRS
|
||||
// and M (An M IS A HRS now). Need to refactor so less duplication between M and its super
|
||||
@ -3608,7 +3612,7 @@ public class HRegionServer extends Thread implements RegionServerServices, LastS
|
||||
}
|
||||
|
||||
private String getMyEphemeralNodePath() {
|
||||
return ZNodePaths.joinZNode(this.zooKeeper.getZNodePaths().rsZNode, getServerName().toString());
|
||||
return zooKeeper.getZNodePaths().getRsPath(serverName);
|
||||
}
|
||||
|
||||
private boolean isHealthCheckerConfigured() {
|
||||
@ -3995,7 +3999,7 @@ public class HRegionServer extends Thread implements RegionServerServices, LastS
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ServerName> getRegionServers() {
|
||||
public Collection<ServerName> getRegionServers() {
|
||||
return regionServerAddressTracker.getRegionServers();
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user