HBASE-23322 [hbck2] Simplification on HBCKSCP scheduling
Signed-off-by: Lijin Bin <binlijin@apache.org>
This commit is contained in:
parent
834ccb4bf6
commit
77b4e8c972
|
@ -32,7 +32,6 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.function.Function;
|
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
@ -71,7 +70,6 @@ import org.apache.hadoop.hbase.ipc.RpcServerInterface;
|
||||||
import org.apache.hadoop.hbase.ipc.ServerRpcController;
|
import org.apache.hadoop.hbase.ipc.ServerRpcController;
|
||||||
import org.apache.hadoop.hbase.master.assignment.RegionStates;
|
import org.apache.hadoop.hbase.master.assignment.RegionStates;
|
||||||
import org.apache.hadoop.hbase.master.locking.LockProcedure;
|
import org.apache.hadoop.hbase.master.locking.LockProcedure;
|
||||||
import org.apache.hadoop.hbase.master.procedure.HBCKServerCrashProcedure;
|
|
||||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
|
import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
|
||||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil.NonceProcedureRunnable;
|
import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil.NonceProcedureRunnable;
|
||||||
|
@ -2587,35 +2585,18 @@ public class MasterRpcServices extends RSRpcServices
|
||||||
public MasterProtos.ScheduleServerCrashProcedureResponse scheduleServerCrashProcedure(
|
public MasterProtos.ScheduleServerCrashProcedureResponse scheduleServerCrashProcedure(
|
||||||
RpcController controller, MasterProtos.ScheduleServerCrashProcedureRequest request)
|
RpcController controller, MasterProtos.ScheduleServerCrashProcedureRequest request)
|
||||||
throws ServiceException {
|
throws ServiceException {
|
||||||
List<HBaseProtos.ServerName> serverNames = request.getServerNameList();
|
|
||||||
List<Long> pids = new ArrayList<>();
|
List<Long> pids = new ArrayList<>();
|
||||||
try {
|
for (HBaseProtos.ServerName sn: request.getServerNameList()) {
|
||||||
for (HBaseProtos.ServerName sn: serverNames) {
|
ServerName serverName = ProtobufUtil.toServerName(sn);
|
||||||
ServerName serverName = ProtobufUtil.toServerName(sn);
|
LOG.info("{} schedule ServerCrashProcedure for {}",
|
||||||
LOG.info("{} schedule ServerCrashProcedure for {}",
|
this.master.getClientIdAuditPrefix(), serverName);
|
||||||
this.master.getClientIdAuditPrefix(), serverName);
|
if (shouldSubmitSCP(serverName)) {
|
||||||
if (shouldSubmitSCP(serverName)) {
|
pids.add(this.master.getServerManager().expireServer(serverName, true));
|
||||||
final boolean containsMetaWALs = containMetaWals(serverName);
|
} else {
|
||||||
long pid = this.master.getServerManager().expireServer(serverName,
|
pids.add(Procedure.NO_PROC_ID);
|
||||||
new Function<ServerName, Long>() {
|
|
||||||
@Override
|
|
||||||
public Long apply(ServerName serverName) {
|
|
||||||
ProcedureExecutor<MasterProcedureEnv> procExec =
|
|
||||||
master.getMasterProcedureExecutor();
|
|
||||||
return procExec.submitProcedure(
|
|
||||||
new HBCKServerCrashProcedure(procExec.getEnvironment(),
|
|
||||||
serverName, true, containsMetaWALs));
|
|
||||||
}
|
|
||||||
});
|
|
||||||
pids.add(pid);
|
|
||||||
} else {
|
|
||||||
pids.add(Procedure.NO_PROC_ID);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return MasterProtos.ScheduleServerCrashProcedureResponse.newBuilder().addAllPid(pids).build();
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new ServiceException(e);
|
|
||||||
}
|
}
|
||||||
|
return MasterProtos.ScheduleServerCrashProcedureResponse.newBuilder().addAllPid(pids).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -33,7 +33,6 @@ import java.util.concurrent.ConcurrentNavigableMap;
|
||||||
import java.util.concurrent.ConcurrentSkipListMap;
|
import java.util.concurrent.ConcurrentSkipListMap;
|
||||||
import java.util.concurrent.CopyOnWriteArrayList;
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.function.Function;
|
|
||||||
import java.util.function.Predicate;
|
import java.util.function.Predicate;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
|
@ -562,32 +561,18 @@ public class ServerManager {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Expire the passed server. Add it to list of dead servers and queue a shutdown processing.
|
* Expire the passed server. Add it to list of dead servers and queue a shutdown processing.
|
||||||
* @return True if we queued a ServerCrashProcedure else false if we did not (could happen for
|
* @return pid if we queued a ServerCrashProcedure else {@link Procedure#NO_PROC_ID} if we did
|
||||||
* many reasons including the fact that its this server that is going down or we already
|
* not (could happen for many reasons including the fact that its this server that is
|
||||||
* have queued an SCP for this server or SCP processing is currently disabled because we
|
* going down or we already have queued an SCP for this server or SCP processing is
|
||||||
* are in startup phase).
|
* currently disabled because we are in startup phase).
|
||||||
*/
|
*/
|
||||||
public boolean expireServer(final ServerName serverName) {
|
@VisibleForTesting // Redo test so we can make this protected.
|
||||||
return expireServer(serverName, new Function<ServerName, Long>() {
|
public synchronized long expireServer(final ServerName serverName) {
|
||||||
@Override
|
return expireServer(serverName, false);
|
||||||
public Long apply(ServerName serverName) {
|
|
||||||
return master.getAssignmentManager().submitServerCrash(serverName, true);
|
|
||||||
}
|
|
||||||
}) != Procedure.NO_PROC_ID;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
synchronized long expireServer(final ServerName serverName, boolean force) {
|
||||||
* Expire the passed server. Add it to list of dead servers and queue a shutdown processing.
|
|
||||||
* Used when expireServer is externally invoked by hbck2.
|
|
||||||
* @param function Takes ServerName and returns pid. See default implementation which queues
|
|
||||||
* an SCP via the AssignmentManager.
|
|
||||||
* @return True if we queued a ServerCrashProcedure else false if we did not (could happen for
|
|
||||||
* many reasons including the fact that its this server that is going down or we already
|
|
||||||
* have queued an SCP for this server or SCP processing is currently disabled because we
|
|
||||||
* are in startup phase).
|
|
||||||
*/
|
|
||||||
synchronized long expireServer(final ServerName serverName,
|
|
||||||
Function<ServerName, Long> function) {
|
|
||||||
// THIS server is going down... can't handle our own expiration.
|
// THIS server is going down... can't handle our own expiration.
|
||||||
if (serverName.equals(master.getServerName())) {
|
if (serverName.equals(master.getServerName())) {
|
||||||
if (!(master.isAborted() || master.isStopped())) {
|
if (!(master.isAborted() || master.isStopped())) {
|
||||||
|
@ -612,10 +597,7 @@ public class ServerManager {
|
||||||
return Procedure.NO_PROC_ID;
|
return Procedure.NO_PROC_ID;
|
||||||
}
|
}
|
||||||
LOG.info("Processing expiration of " + serverName + " on " + this.master.getServerName());
|
LOG.info("Processing expiration of " + serverName + " on " + this.master.getServerName());
|
||||||
long pid = function.apply(serverName);
|
long pid = master.getAssignmentManager().submitServerCrash(serverName, true, force);
|
||||||
if (pid <= 0) {
|
|
||||||
return Procedure.NO_PROC_ID;
|
|
||||||
}
|
|
||||||
// Tell our listeners that a server was removed
|
// Tell our listeners that a server was removed
|
||||||
if (!this.listeners.isEmpty()) {
|
if (!this.listeners.isEmpty()) {
|
||||||
this.listeners.stream().forEach(l -> l.serverRemoved(serverName));
|
this.listeners.stream().forEach(l -> l.serverRemoved(serverName));
|
||||||
|
|
|
@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.master.RegionState.State;
|
||||||
import org.apache.hadoop.hbase.master.ServerManager;
|
import org.apache.hadoop.hbase.master.ServerManager;
|
||||||
import org.apache.hadoop.hbase.master.TableStateManager;
|
import org.apache.hadoop.hbase.master.TableStateManager;
|
||||||
import org.apache.hadoop.hbase.master.balancer.FavoredStochasticBalancer;
|
import org.apache.hadoop.hbase.master.balancer.FavoredStochasticBalancer;
|
||||||
|
import org.apache.hadoop.hbase.master.procedure.HBCKServerCrashProcedure;
|
||||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
|
import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
|
||||||
import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
|
import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
|
||||||
|
@ -1490,15 +1491,21 @@ public class AssignmentManager {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
public long submitServerCrash(ServerName serverName, boolean shouldSplitWal) {
|
/**
|
||||||
boolean carryingMeta;
|
* Usually run by the Master in reaction to server crash during normal processing.
|
||||||
long pid;
|
* Can also be invoked via external RPC to effect repair; in the latter case,
|
||||||
|
* the 'force' flag is set so we push through the SCP though context may indicate
|
||||||
|
* already-running-SCP (An old SCP may have exited abnormally, or damaged cluster
|
||||||
|
* may still have references in hbase:meta to 'Unknown Servers' -- servers that
|
||||||
|
* are not online or in dead servers list, etc.)
|
||||||
|
* @param force Set if the request came in externally over RPC (via hbck2). Force means
|
||||||
|
* run the SCP even if it seems as though there might be an outstanding
|
||||||
|
* SCP running.
|
||||||
|
* @return pid of scheduled SCP or {@link Procedure#NO_PROC_ID} if none scheduled.
|
||||||
|
*/
|
||||||
|
public long submitServerCrash(ServerName serverName, boolean shouldSplitWal, boolean force) {
|
||||||
|
// May be an 'Unknown Server' so handle case where serverNode is null.
|
||||||
ServerStateNode serverNode = regionStates.getServerNode(serverName);
|
ServerStateNode serverNode = regionStates.getServerNode(serverName);
|
||||||
if (serverNode == null) {
|
|
||||||
LOG.info("Skip to add SCP for {} since this server should be OFFLINE already", serverName);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Remove the in-memory rsReports result
|
// Remove the in-memory rsReports result
|
||||||
synchronized (rsReports) {
|
synchronized (rsReports) {
|
||||||
rsReports.remove(serverName);
|
rsReports.remove(serverName);
|
||||||
|
@ -1508,26 +1515,43 @@ public class AssignmentManager {
|
||||||
// server state to CRASHED, we will no longer accept the reportRegionStateTransition call from
|
// server state to CRASHED, we will no longer accept the reportRegionStateTransition call from
|
||||||
// this server. This is used to simplify the implementation for TRSP and SCP, where we can make
|
// this server. This is used to simplify the implementation for TRSP and SCP, where we can make
|
||||||
// sure that, the region list fetched by SCP will not be changed any more.
|
// sure that, the region list fetched by SCP will not be changed any more.
|
||||||
serverNode.writeLock().lock();
|
if (serverNode != null) {
|
||||||
|
serverNode.writeLock().lock();
|
||||||
|
}
|
||||||
|
boolean carryingMeta;
|
||||||
|
long pid;
|
||||||
try {
|
try {
|
||||||
ProcedureExecutor<MasterProcedureEnv> procExec = this.master.getMasterProcedureExecutor();
|
ProcedureExecutor<MasterProcedureEnv> procExec = this.master.getMasterProcedureExecutor();
|
||||||
carryingMeta = isCarryingMeta(serverName);
|
carryingMeta = isCarryingMeta(serverName);
|
||||||
if (!serverNode.isInState(ServerState.ONLINE)) {
|
if (!force && serverNode != null && !serverNode.isInState(ServerState.ONLINE)) {
|
||||||
LOG.info(
|
LOG.info("Skip adding SCP for {} (meta={}) -- running?", serverNode, carryingMeta);
|
||||||
"Skip to add SCP for {} with meta= {}, " +
|
return Procedure.NO_PROC_ID;
|
||||||
"since there should be a SCP is processing or already done for this server node",
|
|
||||||
serverName, carryingMeta);
|
|
||||||
return -1;
|
|
||||||
} else {
|
} else {
|
||||||
serverNode.setState(ServerState.CRASHED);
|
MasterProcedureEnv mpe = procExec.getEnvironment();
|
||||||
pid = procExec.submitProcedure(new ServerCrashProcedure(procExec.getEnvironment(),
|
// If serverNode == null, then 'Unknown Server'. Schedule HBCKSCP instead.
|
||||||
serverName, shouldSplitWal, carryingMeta));
|
// HBCKSCP scours Master in-memory state AND hbase;meta for references to
|
||||||
LOG.info(
|
// serverName just-in-case. An SCP that is scheduled when the server is
|
||||||
"Added {} to dead servers which carryingMeta={}, submitted ServerCrashProcedure pid={}",
|
// 'Unknown' probably originated externally with HBCK2 fix-it tool.
|
||||||
serverName, carryingMeta, pid);
|
ServerState oldState = null;
|
||||||
|
if (serverNode != null) {
|
||||||
|
oldState = serverNode.getState();
|
||||||
|
serverNode.setState(ServerState.CRASHED);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (force) {
|
||||||
|
pid = procExec.submitProcedure(
|
||||||
|
new HBCKServerCrashProcedure(mpe, serverName, shouldSplitWal, carryingMeta));
|
||||||
|
} else {
|
||||||
|
pid = procExec.submitProcedure(
|
||||||
|
new ServerCrashProcedure(mpe, serverName, shouldSplitWal, carryingMeta));
|
||||||
|
}
|
||||||
|
LOG.info("Scheduled SCP pid={} for {} (carryingMeta={}){}.", pid, serverName, carryingMeta,
|
||||||
|
serverNode == null? "": " " + serverNode.toString() + ", oldState=" + oldState);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
serverNode.writeLock().unlock();
|
if (serverNode != null) {
|
||||||
|
serverNode.writeLock().unlock();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return pid;
|
return pid;
|
||||||
}
|
}
|
||||||
|
|
|
@ -714,6 +714,9 @@ public class RegionStates {
|
||||||
serverMap.remove(serverName);
|
serverMap.remove(serverName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return Pertinent ServerStateNode or NULL if none found.
|
||||||
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public ServerStateNode getServerNode(final ServerName serverName) {
|
public ServerStateNode getServerNode(final ServerName serverName) {
|
||||||
return serverMap.get(serverName);
|
return serverMap.get(serverName);
|
||||||
|
|
|
@ -33,12 +33,9 @@ import org.apache.yetus.audience.InterfaceAudience;
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class ServerStateNode implements Comparable<ServerStateNode> {
|
public class ServerStateNode implements Comparable<ServerStateNode> {
|
||||||
|
|
||||||
private final Set<RegionStateNode> regions;
|
private final Set<RegionStateNode> regions;
|
||||||
private final ServerName serverName;
|
private final ServerName serverName;
|
||||||
|
|
||||||
private final ReadWriteLock lock = new ReentrantReadWriteLock();
|
private final ReadWriteLock lock = new ReentrantReadWriteLock();
|
||||||
|
|
||||||
private volatile ServerState state = ServerState.ONLINE;
|
private volatile ServerState state = ServerState.ONLINE;
|
||||||
|
|
||||||
public ServerStateNode(ServerName serverName) {
|
public ServerStateNode(ServerName serverName) {
|
||||||
|
@ -120,6 +117,7 @@ public class ServerStateNode implements Comparable<ServerStateNode> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return String.format("ServerStateNode(%s)", getServerName());
|
return getServerName() + "/" + getState() + "/regionCount=" + this.regions.size() +
|
||||||
|
"/lock=" + this.lock;
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -81,12 +81,15 @@ public class HBCKServerCrashProcedure extends ServerCrashProcedure {
|
||||||
LOG.warn("Failed get of all regions; continuing", ioe);
|
LOG.warn("Failed get of all regions; continuing", ioe);
|
||||||
}
|
}
|
||||||
if (ps == null || ps.isEmpty()) {
|
if (ps == null || ps.isEmpty()) {
|
||||||
|
LOG.warn("No regions found in hbase:meta");
|
||||||
return ris;
|
return ris;
|
||||||
}
|
}
|
||||||
List<RegionInfo> aggregate = ris == null || ris.isEmpty()?
|
List<RegionInfo> aggregate = ris == null || ris.isEmpty()?
|
||||||
new ArrayList<>(): new ArrayList<>(ris);
|
new ArrayList<>(): new ArrayList<>(ris);
|
||||||
|
int before = aggregate.size();
|
||||||
ps.stream().filter(p -> p.getSecond() != null && p.getSecond().equals(getServerName())).
|
ps.stream().filter(p -> p.getSecond() != null && p.getSecond().equals(getServerName())).
|
||||||
forEach(p -> aggregate.add(p.getFirst()));
|
forEach(p -> aggregate.add(p.getFirst()));
|
||||||
|
LOG.info("Found {} mentions of {} in hbase:meta", aggregate.size() - before, getServerName());
|
||||||
return aggregate;
|
return aggregate;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -109,7 +109,8 @@ public class TestClusterRestartFailover extends AbstractTestRestartCluster {
|
||||||
((ServerCrashProcedure) p).getServerName().equals(SERVER_FOR_TEST)).findAny();
|
((ServerCrashProcedure) p).getServerName().equals(SERVER_FOR_TEST)).findAny();
|
||||||
assertTrue("Should have one SCP for " + SERVER_FOR_TEST, procedure.isPresent());
|
assertTrue("Should have one SCP for " + SERVER_FOR_TEST, procedure.isPresent());
|
||||||
assertFalse("Submit the SCP for the same serverName " + SERVER_FOR_TEST + " which should fail",
|
assertFalse("Submit the SCP for the same serverName " + SERVER_FOR_TEST + " which should fail",
|
||||||
UTIL.getHBaseCluster().getMaster().getServerManager().expireServer(SERVER_FOR_TEST));
|
UTIL.getHBaseCluster().getMaster().getServerManager().expireServer(SERVER_FOR_TEST) ==
|
||||||
|
Procedure.NO_PROC_ID);
|
||||||
|
|
||||||
// Wait the SCP to finish
|
// Wait the SCP to finish
|
||||||
SCP_LATCH.countDown();
|
SCP_LATCH.countDown();
|
||||||
|
@ -117,7 +118,8 @@ public class TestClusterRestartFailover extends AbstractTestRestartCluster {
|
||||||
|
|
||||||
assertFalse("Even when the SCP is finished, the duplicate SCP should not be scheduled for " +
|
assertFalse("Even when the SCP is finished, the duplicate SCP should not be scheduled for " +
|
||||||
SERVER_FOR_TEST,
|
SERVER_FOR_TEST,
|
||||||
UTIL.getHBaseCluster().getMaster().getServerManager().expireServer(SERVER_FOR_TEST));
|
UTIL.getHBaseCluster().getMaster().getServerManager().expireServer(SERVER_FOR_TEST) ==
|
||||||
|
Procedure.NO_PROC_ID);
|
||||||
serverNode = UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
|
serverNode = UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
|
||||||
.getServerNode(SERVER_FOR_TEST);
|
.getServerNode(SERVER_FOR_TEST);
|
||||||
assertNull("serverNode should be deleted after SCP finished", serverNode);
|
assertNull("serverNode should be deleted after SCP finished", serverNode);
|
||||||
|
|
|
@ -304,7 +304,7 @@ public abstract class TestAssignmentManagerBase {
|
||||||
|
|
||||||
protected void doCrash(final ServerName serverName) {
|
protected void doCrash(final ServerName serverName) {
|
||||||
this.master.getServerManager().moveFromOnlineToDeadServers(serverName);
|
this.master.getServerManager().moveFromOnlineToDeadServers(serverName);
|
||||||
this.am.submitServerCrash(serverName, false/* No WALs here */);
|
this.am.submitServerCrash(serverName, false/* No WALs here */, false);
|
||||||
// add a new server to avoid killing all the region servers which may hang the UTs
|
// add a new server to avoid killing all the region servers which may hang the UTs
|
||||||
ServerName newSn = ServerName.valueOf("localhost", 10000 + newRsAdded, 1);
|
ServerName newSn = ServerName.valueOf("localhost", 10000 + newRsAdded, 1);
|
||||||
newRsAdded++;
|
newRsAdded++;
|
||||||
|
|
Loading…
Reference in New Issue