HBASE-21508 Ignore the reportRegionStateTransition call from a dead server
Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
parent
27c0bf5c63
commit
a0e3cb6c0c
|
@ -3800,8 +3800,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
if (offload) {
|
if (offload) {
|
||||||
final List<ServerName> destServers = this.serverManager.createDestinationServersList();
|
final List<ServerName> destServers = this.serverManager.createDestinationServersList();
|
||||||
for (ServerName server : serversAdded) {
|
for (ServerName server : serversAdded) {
|
||||||
final List<RegionInfo> regionsOnServer =
|
final List<RegionInfo> regionsOnServer = this.assignmentManager.getRegionsOnServer(server);
|
||||||
this.assignmentManager.getRegionStates().getServerRegionInfoSet(server);
|
|
||||||
for (RegionInfo hri : regionsOnServer) {
|
for (RegionInfo hri : regionsOnServer) {
|
||||||
ServerName dest = balancer.randomAssignment(hri, destServers);
|
ServerName dest = balancer.randomAssignment(hri, destServers);
|
||||||
if (dest == null) {
|
if (dest == null) {
|
||||||
|
|
|
@ -53,7 +53,6 @@ import org.apache.hadoop.hbase.master.MetricsAssignmentManager;
|
||||||
import org.apache.hadoop.hbase.master.RegionPlan;
|
import org.apache.hadoop.hbase.master.RegionPlan;
|
||||||
import org.apache.hadoop.hbase.master.RegionState;
|
import org.apache.hadoop.hbase.master.RegionState;
|
||||||
import org.apache.hadoop.hbase.master.RegionState.State;
|
import org.apache.hadoop.hbase.master.RegionState.State;
|
||||||
import org.apache.hadoop.hbase.master.ServerListener;
|
|
||||||
import org.apache.hadoop.hbase.master.TableStateManager;
|
import org.apache.hadoop.hbase.master.TableStateManager;
|
||||||
import org.apache.hadoop.hbase.master.balancer.FavoredStochasticBalancer;
|
import org.apache.hadoop.hbase.master.balancer.FavoredStochasticBalancer;
|
||||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||||
|
@ -99,7 +98,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto
|
||||||
* Unassigns are triggered by DisableTable, Split, Merge
|
* Unassigns are triggered by DisableTable, Split, Merge
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class AssignmentManager implements ServerListener {
|
public class AssignmentManager {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(AssignmentManager.class);
|
private static final Logger LOG = LoggerFactory.getLogger(AssignmentManager.class);
|
||||||
|
|
||||||
// TODO: AMv2
|
// TODO: AMv2
|
||||||
|
@ -193,9 +192,6 @@ public class AssignmentManager implements ServerListener {
|
||||||
|
|
||||||
LOG.trace("Starting assignment manager");
|
LOG.trace("Starting assignment manager");
|
||||||
|
|
||||||
// Register Server Listener
|
|
||||||
master.getServerManager().registerListener(this);
|
|
||||||
|
|
||||||
// Start the Assignment Thread
|
// Start the Assignment Thread
|
||||||
startAssignmentThread();
|
startAssignmentThread();
|
||||||
|
|
||||||
|
@ -275,9 +271,6 @@ public class AssignmentManager implements ServerListener {
|
||||||
// Stop the RegionStateStore
|
// Stop the RegionStateStore
|
||||||
regionStates.clear();
|
regionStates.clear();
|
||||||
|
|
||||||
// Unregister Server Listener
|
|
||||||
master.getServerManager().unregisterListener(this);
|
|
||||||
|
|
||||||
// Update meta events (for testing)
|
// Update meta events (for testing)
|
||||||
if (hasProcExecutor) {
|
if (hasProcExecutor) {
|
||||||
metaLoadEvent.suspend();
|
metaLoadEvent.suspend();
|
||||||
|
@ -319,14 +312,31 @@ public class AssignmentManager implements ServerListener {
|
||||||
return regionStates;
|
return regionStates;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the regions hosted by the specified server.
|
||||||
|
* <p/>
|
||||||
|
* Notice that, for SCP, after we submit the SCP, no one can change the region list for the
|
||||||
|
* ServerStateNode so we do not need any locks here. And for other usage, this can only give you a
|
||||||
|
* snapshot of the current region list for this server, which means, right after you get the
|
||||||
|
* region list, new regions may be moved to this server or some regions may be moved out from this
|
||||||
|
* server, so you should not use it critically if you need strong consistency.
|
||||||
|
*/
|
||||||
|
public List<RegionInfo> getRegionsOnServer(ServerName serverName) {
|
||||||
|
ServerStateNode serverInfo = regionStates.getServerNode(serverName);
|
||||||
|
if (serverInfo == null) {
|
||||||
|
return Collections.emptyList();
|
||||||
|
}
|
||||||
|
return serverInfo.getRegionInfoList();
|
||||||
|
}
|
||||||
|
|
||||||
public RegionStateStore getRegionStateStore() {
|
public RegionStateStore getRegionStateStore() {
|
||||||
return regionStateStore;
|
return regionStateStore;
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<ServerName> getFavoredNodes(final RegionInfo regionInfo) {
|
public List<ServerName> getFavoredNodes(final RegionInfo regionInfo) {
|
||||||
return this.shouldAssignRegionsWithFavoredNodes?
|
return this.shouldAssignRegionsWithFavoredNodes
|
||||||
((FavoredStochasticBalancer)getBalancer()).getFavoredNodes(regionInfo):
|
? ((FavoredStochasticBalancer) getBalancer()).getFavoredNodes(regionInfo)
|
||||||
ServerName.EMPTY_SERVER_LIST;
|
: ServerName.EMPTY_SERVER_LIST;
|
||||||
}
|
}
|
||||||
|
|
||||||
// ============================================================================================
|
// ============================================================================================
|
||||||
|
@ -522,12 +532,11 @@ public class AssignmentManager implements ServerListener {
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<RegionInfo> getSystemTables(ServerName serverName) {
|
private List<RegionInfo> getSystemTables(ServerName serverName) {
|
||||||
Set<RegionStateNode> regions = this.getRegionStates().getServerNode(serverName).getRegions();
|
ServerStateNode serverNode = regionStates.getServerNode(serverName);
|
||||||
if (regions == null) {
|
if (serverNode == null) {
|
||||||
return Collections.emptyList();
|
return Collections.emptyList();
|
||||||
}
|
}
|
||||||
return regions.stream().map(RegionStateNode::getRegionInfo)
|
return serverNode.getSystemRegionInfoList();
|
||||||
.filter(r -> r.getTable().isSystemTable()).collect(Collectors.toList());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void preTransitCheck(RegionStateNode regionNode, RegionState.State[] expectedStates)
|
private void preTransitCheck(RegionStateNode regionNode, RegionState.State[] expectedStates)
|
||||||
|
@ -817,54 +826,79 @@ public class AssignmentManager implements ServerListener {
|
||||||
// ============================================================================================
|
// ============================================================================================
|
||||||
// RS Region Transition Report helpers
|
// RS Region Transition Report helpers
|
||||||
// ============================================================================================
|
// ============================================================================================
|
||||||
// TODO: Move this code in MasterRpcServices and call on specific event?
|
private void reportRegionStateTransition(ReportRegionStateTransitionResponse.Builder builder,
|
||||||
|
ServerName serverName, List<RegionStateTransition> transitionList) throws IOException {
|
||||||
|
for (RegionStateTransition transition : transitionList) {
|
||||||
|
switch (transition.getTransitionCode()) {
|
||||||
|
case OPENED:
|
||||||
|
case FAILED_OPEN:
|
||||||
|
case CLOSED:
|
||||||
|
assert transition.getRegionInfoCount() == 1 : transition;
|
||||||
|
final RegionInfo hri = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
|
||||||
|
updateRegionTransition(serverName, transition.getTransitionCode(), hri,
|
||||||
|
transition.hasOpenSeqNum() ? transition.getOpenSeqNum() : HConstants.NO_SEQNUM);
|
||||||
|
break;
|
||||||
|
case READY_TO_SPLIT:
|
||||||
|
case SPLIT:
|
||||||
|
case SPLIT_REVERTED:
|
||||||
|
assert transition.getRegionInfoCount() == 3 : transition;
|
||||||
|
final RegionInfo parent = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
|
||||||
|
final RegionInfo splitA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1));
|
||||||
|
final RegionInfo splitB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2));
|
||||||
|
updateRegionSplitTransition(serverName, transition.getTransitionCode(), parent, splitA,
|
||||||
|
splitB);
|
||||||
|
break;
|
||||||
|
case READY_TO_MERGE:
|
||||||
|
case MERGED:
|
||||||
|
case MERGE_REVERTED:
|
||||||
|
assert transition.getRegionInfoCount() == 3 : transition;
|
||||||
|
final RegionInfo merged = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
|
||||||
|
final RegionInfo mergeA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1));
|
||||||
|
final RegionInfo mergeB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2));
|
||||||
|
updateRegionMergeTransition(serverName, transition.getTransitionCode(), merged, mergeA,
|
||||||
|
mergeB);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public ReportRegionStateTransitionResponse reportRegionStateTransition(
|
public ReportRegionStateTransitionResponse reportRegionStateTransition(
|
||||||
final ReportRegionStateTransitionRequest req) throws PleaseHoldException {
|
final ReportRegionStateTransitionRequest req) throws PleaseHoldException {
|
||||||
final ReportRegionStateTransitionResponse.Builder builder =
|
ReportRegionStateTransitionResponse.Builder builder =
|
||||||
ReportRegionStateTransitionResponse.newBuilder();
|
ReportRegionStateTransitionResponse.newBuilder();
|
||||||
final ServerName serverName = ProtobufUtil.toServerName(req.getServer());
|
ServerName serverName = ProtobufUtil.toServerName(req.getServer());
|
||||||
|
ServerStateNode serverNode = regionStates.getOrCreateServer(serverName);
|
||||||
|
// here we have to acquire a read lock instead of a simple exclusive lock. This is because that
|
||||||
|
// we should not block other reportRegionStateTransition call from the same region server. This
|
||||||
|
// is not only about performance, but also to prevent dead lock. Think of the meta region is
|
||||||
|
// also on the same region server and you hold the lock which blocks the
|
||||||
|
// reportRegionStateTransition for meta, and since meta is not online, you will block inside the
|
||||||
|
// lock protection to wait for meta online...
|
||||||
|
serverNode.readLock().lock();
|
||||||
try {
|
try {
|
||||||
for (RegionStateTransition transition: req.getTransitionList()) {
|
// we only accept reportRegionStateTransition if the region server is online, see the comment
|
||||||
switch (transition.getTransitionCode()) {
|
// above in submitServerCrash method and HBASE-21508 for more details.
|
||||||
case OPENED:
|
if (serverNode.isInState(ServerState.ONLINE)) {
|
||||||
case FAILED_OPEN:
|
try {
|
||||||
case CLOSED:
|
reportRegionStateTransition(builder, serverName, req.getTransitionList());
|
||||||
assert transition.getRegionInfoCount() == 1 : transition;
|
} catch (PleaseHoldException e) {
|
||||||
final RegionInfo hri = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
|
LOG.trace("Failed transition ", e);
|
||||||
updateRegionTransition(serverName, transition.getTransitionCode(), hri,
|
throw e;
|
||||||
transition.hasOpenSeqNum() ? transition.getOpenSeqNum() : HConstants.NO_SEQNUM);
|
} catch (UnsupportedOperationException | IOException e) {
|
||||||
break;
|
// TODO: at the moment we have a single error message and the RS will abort
|
||||||
case READY_TO_SPLIT:
|
// if the master says that one of the region transitions failed.
|
||||||
case SPLIT:
|
LOG.warn("Failed transition", e);
|
||||||
case SPLIT_REVERTED:
|
builder.setErrorMessage("Failed transition " + e.getMessage());
|
||||||
assert transition.getRegionInfoCount() == 3 : transition;
|
|
||||||
final RegionInfo parent = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
|
|
||||||
final RegionInfo splitA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1));
|
|
||||||
final RegionInfo splitB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2));
|
|
||||||
updateRegionSplitTransition(serverName, transition.getTransitionCode(),
|
|
||||||
parent, splitA, splitB);
|
|
||||||
break;
|
|
||||||
case READY_TO_MERGE:
|
|
||||||
case MERGED:
|
|
||||||
case MERGE_REVERTED:
|
|
||||||
assert transition.getRegionInfoCount() == 3 : transition;
|
|
||||||
final RegionInfo merged = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
|
|
||||||
final RegionInfo mergeA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1));
|
|
||||||
final RegionInfo mergeB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2));
|
|
||||||
updateRegionMergeTransition(serverName, transition.getTransitionCode(),
|
|
||||||
merged, mergeA, mergeB);
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
LOG.warn("The region server {} is already dead, skip reportRegionStateTransition call",
|
||||||
|
serverName);
|
||||||
|
builder.setErrorMessage("You are dead");
|
||||||
}
|
}
|
||||||
} catch (PleaseHoldException e) {
|
} finally {
|
||||||
LOG.trace("Failed transition ", e);
|
serverNode.readLock().unlock();
|
||||||
throw e;
|
|
||||||
} catch (UnsupportedOperationException|IOException e) {
|
|
||||||
// TODO: at the moment we have a single error message and the RS will abort
|
|
||||||
// if the master says that one of the region transitions failed.
|
|
||||||
LOG.warn("Failed transition", e);
|
|
||||||
builder.setErrorMessage("Failed transition " + e.getMessage());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1017,9 +1051,6 @@ public class AssignmentManager implements ServerListener {
|
||||||
}
|
}
|
||||||
// The Heartbeat tells us of what regions are on the region serve, check the state.
|
// The Heartbeat tells us of what regions are on the region serve, check the state.
|
||||||
checkOnlineRegionsReport(serverNode, regionNames);
|
checkOnlineRegionsReport(serverNode, regionNames);
|
||||||
|
|
||||||
// wake report event
|
|
||||||
wakeServerReportEvent(serverNode);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// just check and output possible inconsistency, without actually doing anything
|
// just check and output possible inconsistency, without actually doing anything
|
||||||
|
@ -1061,18 +1092,6 @@ public class AssignmentManager implements ServerListener {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected boolean waitServerReportEvent(ServerName serverName, Procedure<?> proc) {
|
|
||||||
final ServerStateNode serverNode = regionStates.getOrCreateServer(serverName);
|
|
||||||
if (serverNode == null) {
|
|
||||||
LOG.warn("serverName=null; {}", proc);
|
|
||||||
}
|
|
||||||
return serverNode.getReportEvent().suspendIfNotReady(proc);
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void wakeServerReportEvent(final ServerStateNode serverNode) {
|
|
||||||
serverNode.getReportEvent().wake(getProcedureScheduler());
|
|
||||||
}
|
|
||||||
|
|
||||||
// ============================================================================================
|
// ============================================================================================
|
||||||
// RIT chore
|
// RIT chore
|
||||||
// ============================================================================================
|
// ============================================================================================
|
||||||
|
@ -1321,13 +1340,27 @@ public class AssignmentManager implements ServerListener {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
public long submitServerCrash(final ServerName serverName, final boolean shouldSplitWal) {
|
public long submitServerCrash(ServerName serverName, boolean shouldSplitWal) {
|
||||||
boolean carryingMeta = isCarryingMeta(serverName);
|
boolean carryingMeta;
|
||||||
ProcedureExecutor<MasterProcedureEnv> procExec = this.master.getMasterProcedureExecutor();
|
long pid;
|
||||||
long pid = procExec.submitProcedure(new ServerCrashProcedure(procExec.getEnvironment(),
|
ServerStateNode serverNode = regionStates.getOrCreateServer(serverName);
|
||||||
serverName, shouldSplitWal, carryingMeta));
|
// we hold the write lock here for fencing on reportRegionStateTransition. Once we set the
|
||||||
LOG.debug("Added=" + serverName
|
// server state to CRASHED, we will no longer accept the reportRegionStateTransition call from
|
||||||
+ " to dead servers, submitted shutdown handler to be executed meta=" + carryingMeta);
|
// this server. This is used to simplify the implementation for TRSP and SCP, where we can make
|
||||||
|
// sure that, the region list fetched by SCP will not be changed any more.
|
||||||
|
serverNode.writeLock().lock();
|
||||||
|
try {
|
||||||
|
serverNode.setState(ServerState.CRASHED);
|
||||||
|
carryingMeta = isCarryingMeta(serverName);
|
||||||
|
ProcedureExecutor<MasterProcedureEnv> procExec = this.master.getMasterProcedureExecutor();
|
||||||
|
pid = procExec.submitProcedure(new ServerCrashProcedure(procExec.getEnvironment(), serverName,
|
||||||
|
shouldSplitWal, carryingMeta));
|
||||||
|
} finally {
|
||||||
|
serverNode.writeLock().unlock();
|
||||||
|
}
|
||||||
|
LOG.info(
|
||||||
|
"Added {} to dead servers which carryingMeta={}, submitted ServerCrashProcedure pid={}",
|
||||||
|
serverName, carryingMeta, pid);
|
||||||
return pid;
|
return pid;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1847,22 +1880,6 @@ public class AssignmentManager implements ServerListener {
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
|
|
||||||
// ============================================================================================
|
|
||||||
// Server Helpers
|
|
||||||
// ============================================================================================
|
|
||||||
@Override
|
|
||||||
public void serverAdded(final ServerName serverName) {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void serverRemoved(final ServerName serverName) {
|
|
||||||
final ServerStateNode serverNode = regionStates.getServerNode(serverName);
|
|
||||||
if (serverNode == null) return;
|
|
||||||
|
|
||||||
// just in case, wake procedures waiting for this server report
|
|
||||||
wakeServerReportEvent(serverNode);
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
MasterServices getMaster() {
|
MasterServices getMaster() {
|
||||||
return master;
|
return master;
|
||||||
|
|
|
@ -357,22 +357,6 @@ public class RegionStates {
|
||||||
((hri.isOffline() || hri.isSplit()) && offline);
|
((hri.isOffline() || hri.isSplit()) && offline);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns the set of regions hosted by the specified server
|
|
||||||
* @param serverName the server we are interested in
|
|
||||||
* @return set of RegionInfo hosted by the specified server
|
|
||||||
*/
|
|
||||||
public List<RegionInfo> getServerRegionInfoSet(final ServerName serverName) {
|
|
||||||
ServerStateNode serverInfo = getServerNode(serverName);
|
|
||||||
if (serverInfo == null) {
|
|
||||||
return Collections.emptyList();
|
|
||||||
}
|
|
||||||
|
|
||||||
synchronized (serverInfo) {
|
|
||||||
return serverInfo.getRegionInfoList();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ============================================================================================
|
// ============================================================================================
|
||||||
// Split helpers
|
// Split helpers
|
||||||
// These methods will only be called in ServerCrashProcedure, and at the end of SCP we will remove
|
// These methods will only be called in ServerCrashProcedure, and at the end of SCP we will remove
|
||||||
|
|
|
@ -29,6 +29,11 @@ enum ServerState {
|
||||||
*/
|
*/
|
||||||
ONLINE,
|
ONLINE,
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Indicate that the server has crashed, i.e., we have already scheduled a SCP for it.
|
||||||
|
*/
|
||||||
|
CRASHED,
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Only server which carries meta can have this state. We will split wal for meta and then
|
* Only server which carries meta can have this state. We will split wal for meta and then
|
||||||
* assign meta first before splitting other wals.
|
* assign meta first before splitting other wals.
|
||||||
|
|
|
@ -17,12 +17,15 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.master.assignment;
|
package org.apache.hadoop.hbase.master.assignment;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.locks.Lock;
|
||||||
|
import java.util.concurrent.locks.ReadWriteLock;
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
|
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -31,23 +34,16 @@ import org.apache.yetus.audience.InterfaceAudience;
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
class ServerStateNode implements Comparable<ServerStateNode> {
|
class ServerStateNode implements Comparable<ServerStateNode> {
|
||||||
|
|
||||||
private static final class ServerReportEvent extends ProcedureEvent<ServerName> {
|
|
||||||
public ServerReportEvent(final ServerName serverName) {
|
|
||||||
super(serverName);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private final ServerReportEvent reportEvent;
|
|
||||||
|
|
||||||
private final Set<RegionStateNode> regions;
|
private final Set<RegionStateNode> regions;
|
||||||
private final ServerName serverName;
|
private final ServerName serverName;
|
||||||
|
|
||||||
|
private final ReadWriteLock lock = new ReentrantReadWriteLock();
|
||||||
|
|
||||||
private volatile ServerState state = ServerState.ONLINE;
|
private volatile ServerState state = ServerState.ONLINE;
|
||||||
|
|
||||||
public ServerStateNode(final ServerName serverName) {
|
public ServerStateNode(ServerName serverName) {
|
||||||
this.serverName = serverName;
|
this.serverName = serverName;
|
||||||
this.regions = ConcurrentHashMap.newKeySet();
|
this.regions = ConcurrentHashMap.newKeySet();
|
||||||
this.reportEvent = new ServerReportEvent(serverName);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public ServerName getServerName() {
|
public ServerName getServerName() {
|
||||||
|
@ -58,10 +54,6 @@ class ServerStateNode implements Comparable<ServerStateNode> {
|
||||||
return state;
|
return state;
|
||||||
}
|
}
|
||||||
|
|
||||||
public ProcedureEvent<?> getReportEvent() {
|
|
||||||
return reportEvent;
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean isInState(final ServerState... expected) {
|
public boolean isInState(final ServerState... expected) {
|
||||||
boolean expectedState = false;
|
boolean expectedState = false;
|
||||||
if (expected != null) {
|
if (expected != null) {
|
||||||
|
@ -76,20 +68,17 @@ class ServerStateNode implements Comparable<ServerStateNode> {
|
||||||
this.state = state;
|
this.state = state;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Set<RegionStateNode> getRegions() {
|
|
||||||
return regions;
|
|
||||||
}
|
|
||||||
|
|
||||||
public int getRegionCount() {
|
public int getRegionCount() {
|
||||||
return regions.size();
|
return regions.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
public ArrayList<RegionInfo> getRegionInfoList() {
|
public List<RegionInfo> getRegionInfoList() {
|
||||||
ArrayList<RegionInfo> hris = new ArrayList<RegionInfo>(regions.size());
|
return regions.stream().map(RegionStateNode::getRegionInfo).collect(Collectors.toList());
|
||||||
for (RegionStateNode region : regions) {
|
}
|
||||||
hris.add(region.getRegionInfo());
|
|
||||||
}
|
public List<RegionInfo> getSystemRegionInfoList() {
|
||||||
return hris;
|
return regions.stream().filter(RegionStateNode::isSystemTable)
|
||||||
|
.map(RegionStateNode::getRegionInfo).collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void addRegion(final RegionStateNode regionNode) {
|
public void addRegion(final RegionStateNode regionNode) {
|
||||||
|
@ -100,6 +89,14 @@ class ServerStateNode implements Comparable<ServerStateNode> {
|
||||||
this.regions.remove(regionNode);
|
this.regions.remove(regionNode);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Lock readLock() {
|
||||||
|
return lock.readLock();
|
||||||
|
}
|
||||||
|
|
||||||
|
public Lock writeLock() {
|
||||||
|
return lock.writeLock();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int compareTo(final ServerStateNode other) {
|
public int compareTo(final ServerStateNode other) {
|
||||||
return getServerName().compareTo(other.getServerName());
|
return getServerName().compareTo(other.getServerName());
|
||||||
|
|
|
@ -500,11 +500,9 @@ public class TransitRegionStateProcedure
|
||||||
case REGION_STATE_TRANSITION_CONFIRM_CLOSED:
|
case REGION_STATE_TRANSITION_CONFIRM_CLOSED:
|
||||||
case REGION_STATE_TRANSITION_CONFIRM_OPENED:
|
case REGION_STATE_TRANSITION_CONFIRM_OPENED:
|
||||||
// for these 3 states, the region may still be online on the crashed server
|
// for these 3 states, the region may still be online on the crashed server
|
||||||
if (serverName.equals(regionNode.getRegionLocation())) {
|
env.getAssignmentManager().regionClosed(regionNode, false);
|
||||||
env.getAssignmentManager().regionClosed(regionNode, false);
|
if (currentState != RegionStateTransitionState.REGION_STATE_TRANSITION_CLOSE) {
|
||||||
if (currentState != RegionStateTransitionState.REGION_STATE_TRANSITION_CLOSE) {
|
regionNode.getProcedureEvent().wake(env.getProcedureScheduler());
|
||||||
regionNode.getProcedureEvent().wake(env.getProcedureScheduler());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
|
|
|
@ -61,52 +61,62 @@ public final class ProcedureSyncWait {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class ProcedureFuture implements Future<byte[]> {
|
private static class ProcedureFuture implements Future<byte[]> {
|
||||||
private final ProcedureExecutor<MasterProcedureEnv> procExec;
|
private final ProcedureExecutor<MasterProcedureEnv> procExec;
|
||||||
private final Procedure<?> proc;
|
private final Procedure<?> proc;
|
||||||
|
|
||||||
private boolean hasResult = false;
|
private boolean hasResult = false;
|
||||||
private byte[] result = null;
|
private byte[] result = null;
|
||||||
|
|
||||||
public ProcedureFuture(ProcedureExecutor<MasterProcedureEnv> procExec, Procedure<?> proc) {
|
public ProcedureFuture(ProcedureExecutor<MasterProcedureEnv> procExec, Procedure<?> proc) {
|
||||||
this.procExec = procExec;
|
this.procExec = procExec;
|
||||||
this.proc = proc;
|
this.proc = proc;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean cancel(boolean mayInterruptIfRunning) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isCancelled() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isDone() {
|
||||||
|
return hasResult;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public byte[] get() throws InterruptedException, ExecutionException {
|
||||||
|
if (hasResult) {
|
||||||
|
return result;
|
||||||
}
|
}
|
||||||
|
try {
|
||||||
@Override
|
return waitForProcedureToComplete(procExec, proc, Long.MAX_VALUE);
|
||||||
public boolean cancel(boolean mayInterruptIfRunning) { return false; }
|
} catch (Exception e) {
|
||||||
|
throw new ExecutionException(e);
|
||||||
@Override
|
|
||||||
public boolean isCancelled() { return false; }
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isDone() { return hasResult; }
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public byte[] get() throws InterruptedException, ExecutionException {
|
|
||||||
if (hasResult) return result;
|
|
||||||
try {
|
|
||||||
return waitForProcedureToComplete(procExec, proc, Long.MAX_VALUE);
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new ExecutionException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public byte[] get(long timeout, TimeUnit unit)
|
|
||||||
throws InterruptedException, ExecutionException, TimeoutException {
|
|
||||||
if (hasResult) return result;
|
|
||||||
try {
|
|
||||||
result = waitForProcedureToComplete(procExec, proc, unit.toMillis(timeout));
|
|
||||||
hasResult = true;
|
|
||||||
return result;
|
|
||||||
} catch (TimeoutIOException e) {
|
|
||||||
throw new TimeoutException(e.getMessage());
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new ExecutionException(e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public byte[] get(long timeout, TimeUnit unit)
|
||||||
|
throws InterruptedException, ExecutionException, TimeoutException {
|
||||||
|
if (hasResult) {
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
result = waitForProcedureToComplete(procExec, proc, unit.toMillis(timeout));
|
||||||
|
hasResult = true;
|
||||||
|
return result;
|
||||||
|
} catch (TimeoutIOException e) {
|
||||||
|
throw new TimeoutException(e.getMessage());
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new ExecutionException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public static Future<byte[]> submitProcedure(final ProcedureExecutor<MasterProcedureEnv> procExec,
|
public static Future<byte[]> submitProcedure(final ProcedureExecutor<MasterProcedureEnv> procExec,
|
||||||
final Procedure<MasterProcedureEnv> proc) {
|
final Procedure<MasterProcedureEnv> proc) {
|
||||||
if (proc.isInitializing()) {
|
if (proc.isInitializing()) {
|
||||||
|
@ -124,9 +134,8 @@ public final class ProcedureSyncWait {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static byte[] waitForProcedureToCompleteIOE(
|
public static byte[] waitForProcedureToCompleteIOE(
|
||||||
final ProcedureExecutor<MasterProcedureEnv> procExec,
|
final ProcedureExecutor<MasterProcedureEnv> procExec, final Procedure<?> proc,
|
||||||
final Procedure<?> proc, final long timeout)
|
final long timeout) throws IOException {
|
||||||
throws IOException {
|
|
||||||
try {
|
try {
|
||||||
return waitForProcedureToComplete(procExec, proc, timeout);
|
return waitForProcedureToComplete(procExec, proc, timeout);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
@ -139,7 +148,7 @@ public final class ProcedureSyncWait {
|
||||||
public static byte[] waitForProcedureToComplete(
|
public static byte[] waitForProcedureToComplete(
|
||||||
final ProcedureExecutor<MasterProcedureEnv> procExec, final Procedure<?> proc,
|
final ProcedureExecutor<MasterProcedureEnv> procExec, final Procedure<?> proc,
|
||||||
final long timeout) throws IOException {
|
final long timeout) throws IOException {
|
||||||
waitFor(procExec.getEnvironment(), "pid=" + proc.getProcId(),
|
waitFor(procExec.getEnvironment(), timeout, "pid=" + proc.getProcId(),
|
||||||
new ProcedureSyncWait.Predicate<Boolean>() {
|
new ProcedureSyncWait.Predicate<Boolean>() {
|
||||||
@Override
|
@Override
|
||||||
public Boolean evaluate() throws IOException {
|
public Boolean evaluate() throws IOException {
|
||||||
|
@ -171,15 +180,25 @@ public final class ProcedureSyncWait {
|
||||||
|
|
||||||
public static <T> T waitFor(MasterProcedureEnv env, String purpose, Predicate<T> predicate)
|
public static <T> T waitFor(MasterProcedureEnv env, String purpose, Predicate<T> predicate)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
final Configuration conf = env.getMasterConfiguration();
|
Configuration conf = env.getMasterConfiguration();
|
||||||
final long waitTime = conf.getLong("hbase.master.wait.on.region", 5 * 60 * 1000);
|
long waitTime = conf.getLong("hbase.master.wait.on.region", 5 * 60 * 1000);
|
||||||
final long waitingTimeForEvents = conf.getInt("hbase.master.event.waiting.time", 1000);
|
return waitFor(env, waitTime, purpose, predicate);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static <T> T waitFor(MasterProcedureEnv env, long waitTime, String purpose,
|
||||||
|
Predicate<T> predicate) throws IOException {
|
||||||
|
Configuration conf = env.getMasterConfiguration();
|
||||||
|
long waitingTimeForEvents = conf.getInt("hbase.master.event.waiting.time", 1000);
|
||||||
return waitFor(env, waitTime, waitingTimeForEvents, purpose, predicate);
|
return waitFor(env, waitTime, waitingTimeForEvents, purpose, predicate);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static <T> T waitFor(MasterProcedureEnv env, long waitTime, long waitingTimeForEvents,
|
public static <T> T waitFor(MasterProcedureEnv env, long waitTime, long waitingTimeForEvents,
|
||||||
String purpose, Predicate<T> predicate) throws IOException {
|
String purpose, Predicate<T> predicate) throws IOException {
|
||||||
final long done = EnvironmentEdgeManager.currentTime() + waitTime;
|
long done = EnvironmentEdgeManager.currentTime() + waitTime;
|
||||||
|
if (done <= 0) {
|
||||||
|
// long overflow, usually this means we pass Long.MAX_VALUE as waitTime
|
||||||
|
done = Long.MAX_VALUE;
|
||||||
|
}
|
||||||
boolean logged = false;
|
boolean logged = false;
|
||||||
do {
|
do {
|
||||||
T result = predicate.evaluate();
|
T result = predicate.evaluate();
|
||||||
|
|
|
@ -146,7 +146,7 @@ public class ServerCrashProcedure
|
||||||
break;
|
break;
|
||||||
case SERVER_CRASH_GET_REGIONS:
|
case SERVER_CRASH_GET_REGIONS:
|
||||||
this.regionsOnCrashedServer =
|
this.regionsOnCrashedServer =
|
||||||
services.getAssignmentManager().getRegionStates().getServerRegionInfoSet(serverName);
|
services.getAssignmentManager().getRegionsOnServer(serverName);
|
||||||
// Where to go next? Depends on whether we should split logs at all or
|
// Where to go next? Depends on whether we should split logs at all or
|
||||||
// if we should do distributed log splitting.
|
// if we should do distributed log splitting.
|
||||||
if (!this.shouldSplitWal) {
|
if (!this.shouldSplitWal) {
|
||||||
|
|
|
@ -713,12 +713,12 @@ public class TestAdmin2 {
|
||||||
assertEquals(3, clusterRegionServers.size());
|
assertEquals(3, clusterRegionServers.size());
|
||||||
|
|
||||||
HashMap<ServerName, List<RegionInfo>> serversToDecommssion = new HashMap<>();
|
HashMap<ServerName, List<RegionInfo>> serversToDecommssion = new HashMap<>();
|
||||||
// Get a server that has regions. We will decommission two of the servers,
|
// Get a server that has meta online. We will decommission two of the servers,
|
||||||
// leaving one online.
|
// leaving one online.
|
||||||
int i;
|
int i;
|
||||||
for (i = 0; i < clusterRegionServers.size(); i++) {
|
for (i = 0; i < clusterRegionServers.size(); i++) {
|
||||||
List<RegionInfo> regionsOnServer = admin.getRegions(clusterRegionServers.get(i));
|
List<RegionInfo> regionsOnServer = admin.getRegions(clusterRegionServers.get(i));
|
||||||
if (regionsOnServer.size() > 0) {
|
if (admin.getRegions(clusterRegionServers.get(i)).stream().anyMatch(p -> p.isMetaRegion())) {
|
||||||
serversToDecommssion.put(clusterRegionServers.get(i), regionsOnServer);
|
serversToDecommssion.put(clusterRegionServers.get(i), regionsOnServer);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.master.assignment;
|
||||||
import static org.mockito.ArgumentMatchers.any;
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.NavigableMap;
|
import java.util.NavigableMap;
|
||||||
|
@ -28,7 +27,6 @@ import java.util.SortedSet;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.CoordinatedStateManager;
|
import org.apache.hadoop.hbase.CoordinatedStateManager;
|
||||||
import org.apache.hadoop.hbase.ServerLoad;
|
|
||||||
import org.apache.hadoop.hbase.ServerMetricsBuilder;
|
import org.apache.hadoop.hbase.ServerMetricsBuilder;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.TableDescriptors;
|
import org.apache.hadoop.hbase.TableDescriptors;
|
||||||
|
@ -51,7 +49,6 @@ import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
|
||||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
|
import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
|
||||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||||
import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher;
|
import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher;
|
||||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
|
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
|
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
|
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
|
||||||
|
@ -93,19 +90,15 @@ public class MockMasterServices extends MockNoopMasterServices {
|
||||||
private final ClusterConnection connection;
|
private final ClusterConnection connection;
|
||||||
private final LoadBalancer balancer;
|
private final LoadBalancer balancer;
|
||||||
private final ServerManager serverManager;
|
private final ServerManager serverManager;
|
||||||
// Set of regions on a 'server'. Populated externally. Used in below faking 'cluster'.
|
|
||||||
private final NavigableMap<ServerName, SortedSet<byte []>> regionsToRegionServers;
|
|
||||||
|
|
||||||
private final ProcedureEvent initialized = new ProcedureEvent("master initialized");
|
private final ProcedureEvent<?> initialized = new ProcedureEvent<>("master initialized");
|
||||||
public static final String DEFAULT_COLUMN_FAMILY_NAME = "cf";
|
public static final String DEFAULT_COLUMN_FAMILY_NAME = "cf";
|
||||||
public static final ServerName MOCK_MASTER_SERVERNAME =
|
public static final ServerName MOCK_MASTER_SERVERNAME =
|
||||||
ServerName.valueOf("mockmaster.example.org", 1234, -1L);
|
ServerName.valueOf("mockmaster.example.org", 1234, -1L);
|
||||||
|
|
||||||
public MockMasterServices(Configuration conf,
|
public MockMasterServices(Configuration conf,
|
||||||
NavigableMap<ServerName, SortedSet<byte []>> regionsToRegionServers)
|
NavigableMap<ServerName, SortedSet<byte[]>> regionsToRegionServers) throws IOException {
|
||||||
throws IOException {
|
|
||||||
super(conf);
|
super(conf);
|
||||||
this.regionsToRegionServers = regionsToRegionServers;
|
|
||||||
Superusers.initialize(conf);
|
Superusers.initialize(conf);
|
||||||
this.fileSystemManager = new MasterFileSystem(conf);
|
this.fileSystemManager = new MasterFileSystem(conf);
|
||||||
this.walManager = new MasterWalManager(this);
|
this.walManager = new MasterWalManager(this);
|
||||||
|
@ -120,15 +113,6 @@ public class MockMasterServices extends MockNoopMasterServices {
|
||||||
public boolean isTableDisabled(final TableName tableName) {
|
public boolean isTableDisabled(final TableName tableName) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
protected boolean waitServerReportEvent(ServerName serverName, Procedure proc) {
|
|
||||||
// Make a report with current state of the server 'serverName' before we call wait..
|
|
||||||
SortedSet<byte[]> regions = regionsToRegionServers.get(serverName);
|
|
||||||
getAssignmentManager().reportOnlineRegions(serverName,
|
|
||||||
regions == null ? new HashSet<byte[]>() : regions);
|
|
||||||
return super.waitServerReportEvent(serverName, proc);
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
this.balancer = LoadBalancerFactory.getLoadBalancer(conf);
|
this.balancer = LoadBalancerFactory.getLoadBalancer(conf);
|
||||||
this.serverManager = new ServerManager(this);
|
this.serverManager = new ServerManager(this);
|
||||||
|
@ -176,7 +160,7 @@ public class MockMasterServices extends MockNoopMasterServices {
|
||||||
this.assignmentManager.start();
|
this.assignmentManager.start();
|
||||||
for (int i = 0; i < numServes; ++i) {
|
for (int i = 0; i < numServes; ++i) {
|
||||||
ServerName sn = ServerName.valueOf("localhost", 100 + i, 1);
|
ServerName sn = ServerName.valueOf("localhost", 100 + i, 1);
|
||||||
serverManager.regionServerReport(sn, new ServerLoad(ServerMetricsBuilder.of(sn)));
|
serverManager.regionServerReport(sn, ServerMetricsBuilder.of(sn));
|
||||||
}
|
}
|
||||||
this.procedureExecutor.getEnvironment().setEventReady(initialized, true);
|
this.procedureExecutor.getEnvironment().setEventReady(initialized, true);
|
||||||
}
|
}
|
||||||
|
@ -202,7 +186,7 @@ public class MockMasterServices extends MockNoopMasterServices {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
ServerName sn = ServerName.valueOf(serverName.getAddress().toString(), startCode);
|
ServerName sn = ServerName.valueOf(serverName.getAddress().toString(), startCode);
|
||||||
serverManager.regionServerReport(sn, new ServerLoad(ServerMetricsBuilder.of(sn)));
|
serverManager.regionServerReport(sn, ServerMetricsBuilder.of(sn));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -260,7 +244,7 @@ public class MockMasterServices extends MockNoopMasterServices {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ProcedureEvent getInitializedEvent() {
|
public ProcedureEvent<?> getInitializedEvent() {
|
||||||
return this.initialized;
|
return this.initialized;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -27,7 +27,6 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||||
import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
|
|
||||||
import org.apache.hadoop.hbase.master.RegionState.State;
|
import org.apache.hadoop.hbase.master.RegionState.State;
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
|
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
|
||||||
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
|
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
|
||||||
|
@ -49,14 +48,6 @@ public class TestAssignmentManager extends TestAssignmentManagerBase {
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(TestAssignmentManager.class);
|
private static final Logger LOG = LoggerFactory.getLogger(TestAssignmentManager.class);
|
||||||
|
|
||||||
@Test(expected = NullPointerException.class)
|
|
||||||
public void testWaitServerReportEventWithNullServer() throws UnexpectedStateException {
|
|
||||||
// Test what happens if we pass in null server. I'd expect it throws NPE.
|
|
||||||
if (this.am.waitServerReportEvent(null, null)) {
|
|
||||||
throw new UnexpectedStateException();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAssignWithGoodExec() throws Exception {
|
public void testAssignWithGoodExec() throws Exception {
|
||||||
// collect AM metrics before test
|
// collect AM metrics before test
|
||||||
|
|
|
@ -186,7 +186,7 @@ public abstract class TestAssignmentManagerBase {
|
||||||
|
|
||||||
protected byte[] waitOnFuture(final Future<byte[]> future) throws Exception {
|
protected byte[] waitOnFuture(final Future<byte[]> future) throws Exception {
|
||||||
try {
|
try {
|
||||||
return future.get(5, TimeUnit.SECONDS);
|
return future.get(60, TimeUnit.SECONDS);
|
||||||
} catch (ExecutionException e) {
|
} catch (ExecutionException e) {
|
||||||
LOG.info("ExecutionException", e);
|
LOG.info("ExecutionException", e);
|
||||||
Exception ee = (Exception) e.getCause();
|
Exception ee = (Exception) e.getCause();
|
||||||
|
|
|
@ -0,0 +1,201 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.master.assignment;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertNotEquals;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HBaseIOException;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.PleaseHoldException;
|
||||||
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.master.HMaster;
|
||||||
|
import org.apache.hadoop.hbase.master.MasterServices;
|
||||||
|
import org.apache.hadoop.hbase.master.RegionPlan;
|
||||||
|
import org.apache.hadoop.hbase.master.ServerManager;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
|
||||||
|
|
||||||
|
@Category({ MasterTests.class, MediumTests.class })
|
||||||
|
public class TestReportRegionStateTransitionFromDeadServer {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestReportRegionStateTransitionFromDeadServer.class);
|
||||||
|
|
||||||
|
private static final List<ServerName> EXCLUDE_SERVERS = new ArrayList<>();
|
||||||
|
|
||||||
|
private static CountDownLatch ARRIVE_GET_REGIONS;
|
||||||
|
private static CountDownLatch RESUME_GET_REGIONS;
|
||||||
|
private static CountDownLatch ARRIVE_REPORT;
|
||||||
|
private static CountDownLatch RESUME_REPORT;
|
||||||
|
|
||||||
|
private static final class ServerManagerForTest extends ServerManager {
|
||||||
|
|
||||||
|
public ServerManagerForTest(MasterServices master) {
|
||||||
|
super(master);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<ServerName> createDestinationServersList() {
|
||||||
|
return super.createDestinationServersList(EXCLUDE_SERVERS);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final class AssignmentManagerForTest extends AssignmentManager {
|
||||||
|
|
||||||
|
public AssignmentManagerForTest(MasterServices master) {
|
||||||
|
super(master);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<RegionInfo> getRegionsOnServer(ServerName serverName) {
|
||||||
|
List<RegionInfo> regions = super.getRegionsOnServer(serverName);
|
||||||
|
if (ARRIVE_GET_REGIONS != null) {
|
||||||
|
ARRIVE_GET_REGIONS.countDown();
|
||||||
|
try {
|
||||||
|
RESUME_GET_REGIONS.await();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return regions;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ReportRegionStateTransitionResponse reportRegionStateTransition(
|
||||||
|
ReportRegionStateTransitionRequest req) throws PleaseHoldException {
|
||||||
|
if (ARRIVE_REPORT != null && req.getTransitionList().stream()
|
||||||
|
.allMatch(t -> !ProtobufUtil.toRegionInfo(t.getRegionInfo(0)).isMetaRegion())) {
|
||||||
|
ARRIVE_REPORT.countDown();
|
||||||
|
try {
|
||||||
|
RESUME_REPORT.await();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return super.reportRegionStateTransition(req);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static final class HMasterForTest extends HMaster {
|
||||||
|
|
||||||
|
public HMasterForTest(Configuration conf) throws IOException, KeeperException {
|
||||||
|
super(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected AssignmentManager createAssignmentManager(MasterServices master) {
|
||||||
|
return new AssignmentManagerForTest(master);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ServerManager createServerManager(MasterServices master) throws IOException {
|
||||||
|
setupClusterConnection();
|
||||||
|
return new ServerManagerForTest(master);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||||
|
|
||||||
|
private static TableName NAME = TableName.valueOf("Report");
|
||||||
|
|
||||||
|
private static byte[] CF = Bytes.toBytes("cf");
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUp() throws Exception {
|
||||||
|
UTIL.getConfiguration().setClass(HConstants.MASTER_IMPL, HMasterForTest.class, HMaster.class);
|
||||||
|
UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 1000);
|
||||||
|
UTIL.startMiniCluster(3);
|
||||||
|
UTIL.getAdmin().balancerSwitch(false, true);
|
||||||
|
UTIL.createTable(NAME, CF);
|
||||||
|
UTIL.waitTableAvailable(NAME);
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDown() throws Exception {
|
||||||
|
UTIL.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test() throws HBaseIOException, InterruptedException, ExecutionException {
|
||||||
|
RegionInfo region = UTIL.getMiniHBaseCluster().getRegions(NAME).get(0).getRegionInfo();
|
||||||
|
AssignmentManager am = UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager();
|
||||||
|
RegionStateNode rsn = am.getRegionStates().getRegionStateNode(region);
|
||||||
|
|
||||||
|
// move from rs0 to rs1, and then kill rs0. Later add rs1 to exclude servers, and at last verify
|
||||||
|
// that the region should not be on rs1 and rs2 both.
|
||||||
|
HRegionServer rs0 = UTIL.getMiniHBaseCluster().getRegionServer(rsn.getRegionLocation());
|
||||||
|
HRegionServer rs1 = UTIL.getOtherRegionServer(rs0);
|
||||||
|
HRegionServer rs2 = UTIL.getMiniHBaseCluster().getRegionServerThreads().stream()
|
||||||
|
.map(t -> t.getRegionServer()).filter(rs -> rs != rs0 && rs != rs1).findAny().get();
|
||||||
|
|
||||||
|
RESUME_REPORT = new CountDownLatch(1);
|
||||||
|
ARRIVE_REPORT = new CountDownLatch(1);
|
||||||
|
Future<?> future =
|
||||||
|
am.moveAsync(new RegionPlan(region, rs0.getServerName(), rs1.getServerName()));
|
||||||
|
ARRIVE_REPORT.await();
|
||||||
|
|
||||||
|
RESUME_GET_REGIONS = new CountDownLatch(1);
|
||||||
|
ARRIVE_GET_REGIONS = new CountDownLatch(1);
|
||||||
|
rs0.abort("For testing!");
|
||||||
|
|
||||||
|
ARRIVE_GET_REGIONS.await();
|
||||||
|
RESUME_REPORT.countDown();
|
||||||
|
|
||||||
|
try {
|
||||||
|
future.get(15, TimeUnit.SECONDS);
|
||||||
|
} catch (TimeoutException e) {
|
||||||
|
// after the fix in HBASE-21508 we will get this exception as the TRSP can not be finished any
|
||||||
|
// more before SCP interrupts it. It's OK.
|
||||||
|
}
|
||||||
|
|
||||||
|
EXCLUDE_SERVERS.add(rs1.getServerName());
|
||||||
|
RESUME_GET_REGIONS.countDown();
|
||||||
|
// wait until there are no running procedures, no SCP and no TRSP
|
||||||
|
UTIL.waitFor(30000, () -> UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor()
|
||||||
|
.getActiveProcIds().isEmpty());
|
||||||
|
boolean onRS1 = !rs1.getRegions(NAME).isEmpty();
|
||||||
|
boolean onRS2 = !rs2.getRegions(NAME).isEmpty();
|
||||||
|
assertNotEquals(
|
||||||
|
"should either be on rs1 or rs2, but onRS1 is " + onRS1 + " and on RS2 is " + onRS2, onRS1,
|
||||||
|
onRS2);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue