HBASE-21380 Filter finished SCP at start
This commit is contained in:
parent
7de5f1d60d
commit
127de9e637
|
@ -121,17 +121,26 @@ public class DeadServer {
|
|||
return clone;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Adds the server to the dead server list if it's not there already.
|
||||
* @param sn the server name
|
||||
*/
|
||||
public synchronized void add(ServerName sn) {
|
||||
add(sn, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds the server to the dead server list if it's not there already.
|
||||
* @param sn the server name
|
||||
* @param processing whether there is an active SCP associated with the server
|
||||
*/
|
||||
public synchronized void add(ServerName sn, boolean processing) {
|
||||
if (!deadServers.containsKey(sn)){
|
||||
deadServers.put(sn, EnvironmentEdgeManager.currentTime());
|
||||
}
|
||||
boolean added = processingServers.add(sn);
|
||||
if (LOG.isDebugEnabled() && added) {
|
||||
LOG.debug("Added " + sn + "; numProcessing=" + processingServers.size());
|
||||
if (processing && processingServers.add(sn)) {
|
||||
LOG.debug("Added {}; numProcessing={}", sn, processingServers.size());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -903,7 +903,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
this.regionServerTracker = new RegionServerTracker(zooKeeper, this, this.serverManager);
|
||||
this.regionServerTracker.start(
|
||||
procedureExecutor.getProcedures().stream().filter(p -> p instanceof ServerCrashProcedure)
|
||||
.map(p -> ((ServerCrashProcedure) p).getServerName()).collect(Collectors.toSet()),
|
||||
.map(p -> ((ServerCrashProcedure) p)).collect(Collectors.toSet()),
|
||||
walManager.getLiveServersFromWALDir(), walManager.getSplittingServersFromWALDir());
|
||||
// This manager will be started AFTER hbase:meta is confirmed on line.
|
||||
// hbase.mirror.table.state.to.zookeeper is so hbase1 clients can connect. They read table
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.ServerMetrics;
|
|||
import org.apache.hadoop.hbase.ServerMetricsBuilder;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.client.VersionInfoUtil;
|
||||
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKListener;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
|
@ -117,7 +118,7 @@ public class RegionServerTracker extends ZKListener {
|
|||
* @param liveServersFromWALDir the live region servers from wal directory.
|
||||
* @param splittingServersFromWALDir Servers whose WALs are being actively 'split'.
|
||||
*/
|
||||
public void start(Set<ServerName> deadServersFromPE, Set<ServerName> liveServersFromWALDir,
|
||||
public void start(Set<ServerCrashProcedure> deadServersFromPE, Set<ServerName> liveServersFromWALDir,
|
||||
Set<ServerName> splittingServersFromWALDir)
|
||||
throws KeeperException, IOException {
|
||||
LOG.info("Starting RegionServerTracker; {} have existing ServerCrashProcedures, {} " +
|
||||
|
@ -126,7 +127,9 @@ public class RegionServerTracker extends ZKListener {
|
|||
// deadServersFromPE is made from a list of outstanding ServerCrashProcedures.
|
||||
// splittingServersFromWALDir are being actively split -- the directory in the FS ends in
|
||||
// '-SPLITTING'. Each splitting server should have a corresponding SCP. Log if not.
|
||||
splittingServersFromWALDir.stream().filter(s -> !deadServersFromPE.contains(s)).
|
||||
Set<ServerName> deadServerNames = deadServersFromPE.stream()
|
||||
.map(s -> s.getServerName()).collect(Collectors.toSet());
|
||||
splittingServersFromWALDir.stream().filter(s -> !deadServerNames.contains(s)).
|
||||
forEach(s -> LOG.error("{} has no matching ServerCrashProcedure", s));
|
||||
watcher.registerListener(this);
|
||||
synchronized (this) {
|
||||
|
|
|
@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.client.RegionInfo;
|
|||
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
|
||||
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
|
||||
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
|
||||
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -316,9 +317,9 @@ public class ServerManager {
|
|||
* @param deadServersFromPE the region servers which already have a SCP associated.
|
||||
* @param liveServersFromWALDir the live region servers from wal directory.
|
||||
*/
|
||||
void findDeadServersAndProcess(Set<ServerName> deadServersFromPE,
|
||||
void findDeadServersAndProcess(Set<ServerCrashProcedure> deadServersFromPE,
|
||||
Set<ServerName> liveServersFromWALDir) {
|
||||
deadServersFromPE.forEach(deadservers::add);
|
||||
deadServersFromPE.forEach(scp -> deadservers.add(scp.getServerName(), !scp.isFinished()));
|
||||
liveServersFromWALDir.stream().filter(sn -> !onlineServers.containsKey(sn))
|
||||
.forEach(this::expireServer);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue