HBASE-21508 Ignore the reportRegionStateTransition call from a dead server

Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
Duo Zhang 2018-11-26 11:28:08 +08:00
parent 7f02645741
commit 33c4da5558
13 changed files with 420 additions and 225 deletions

View File

@ -3774,8 +3774,7 @@ public class HMaster extends HRegionServer implements MasterServices {
if (offload) {
final List<ServerName> destServers = this.serverManager.createDestinationServersList();
for (ServerName server : serversAdded) {
final List<RegionInfo> regionsOnServer =
this.assignmentManager.getRegionStates().getServerRegionInfoSet(server);
final List<RegionInfo> regionsOnServer = this.assignmentManager.getRegionsOnServer(server);
for (RegionInfo hri : regionsOnServer) {
ServerName dest = balancer.randomAssignment(hri, destServers);
if (dest == null) {

View File

@ -53,7 +53,6 @@ import org.apache.hadoop.hbase.master.MetricsAssignmentManager;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.master.ServerListener;
import org.apache.hadoop.hbase.master.TableStateManager;
import org.apache.hadoop.hbase.master.balancer.FavoredStochasticBalancer;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
@ -99,7 +98,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto
* Unassigns are triggered by DisableTable, Split, Merge
*/
@InterfaceAudience.Private
public class AssignmentManager implements ServerListener {
public class AssignmentManager {
private static final Logger LOG = LoggerFactory.getLogger(AssignmentManager.class);
// TODO: AMv2
@ -193,9 +192,6 @@ public class AssignmentManager implements ServerListener {
LOG.trace("Starting assignment manager");
// Register Server Listener
master.getServerManager().registerListener(this);
// Start the Assignment Thread
startAssignmentThread();
@ -275,9 +271,6 @@ public class AssignmentManager implements ServerListener {
// Stop the RegionStateStore
regionStates.clear();
// Unregister Server Listener
master.getServerManager().unregisterListener(this);
// Update meta events (for testing)
if (hasProcExecutor) {
metaLoadEvent.suspend();
@ -319,14 +312,31 @@ public class AssignmentManager implements ServerListener {
return regionStates;
}
/**
* Returns the regions hosted by the specified server.
* <p/>
* Notice that, for SCP, after we submit the SCP, no one can change the region list for the
* ServerStateNode so we do not need any locks here. And for other usage, this can only give you a
* snapshot of the current region list for this server, which means, right after you get the
* region list, new regions may be moved to this server or some regions may be moved out from this
* server, so you should not use it critically if you need strong consistency.
*/
public List<RegionInfo> getRegionsOnServer(ServerName serverName) {
ServerStateNode serverInfo = regionStates.getServerNode(serverName);
if (serverInfo == null) {
return Collections.emptyList();
}
return serverInfo.getRegionInfoList();
}
public RegionStateStore getRegionStateStore() {
return regionStateStore;
}
public List<ServerName> getFavoredNodes(final RegionInfo regionInfo) {
return this.shouldAssignRegionsWithFavoredNodes?
((FavoredStochasticBalancer)getBalancer()).getFavoredNodes(regionInfo):
ServerName.EMPTY_SERVER_LIST;
return this.shouldAssignRegionsWithFavoredNodes
? ((FavoredStochasticBalancer) getBalancer()).getFavoredNodes(regionInfo)
: ServerName.EMPTY_SERVER_LIST;
}
// ============================================================================================
@ -522,12 +532,11 @@ public class AssignmentManager implements ServerListener {
}
private List<RegionInfo> getSystemTables(ServerName serverName) {
Set<RegionStateNode> regions = this.getRegionStates().getServerNode(serverName).getRegions();
if (regions == null) {
ServerStateNode serverNode = regionStates.getServerNode(serverName);
if (serverNode == null) {
return Collections.emptyList();
}
return regions.stream().map(RegionStateNode::getRegionInfo)
.filter(r -> r.getTable().isSystemTable()).collect(Collectors.toList());
return serverNode.getSystemRegionInfoList();
}
private void preTransitCheck(RegionStateNode regionNode, RegionState.State[] expectedStates)
@ -817,54 +826,79 @@ public class AssignmentManager implements ServerListener {
// ============================================================================================
// RS Region Transition Report helpers
// ============================================================================================
// TODO: Move this code in MasterRpcServices and call on specific event?
private void reportRegionStateTransition(ReportRegionStateTransitionResponse.Builder builder,
ServerName serverName, List<RegionStateTransition> transitionList) throws IOException {
for (RegionStateTransition transition : transitionList) {
switch (transition.getTransitionCode()) {
case OPENED:
case FAILED_OPEN:
case CLOSED:
assert transition.getRegionInfoCount() == 1 : transition;
final RegionInfo hri = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
updateRegionTransition(serverName, transition.getTransitionCode(), hri,
transition.hasOpenSeqNum() ? transition.getOpenSeqNum() : HConstants.NO_SEQNUM);
break;
case READY_TO_SPLIT:
case SPLIT:
case SPLIT_REVERTED:
assert transition.getRegionInfoCount() == 3 : transition;
final RegionInfo parent = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
final RegionInfo splitA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1));
final RegionInfo splitB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2));
updateRegionSplitTransition(serverName, transition.getTransitionCode(), parent, splitA,
splitB);
break;
case READY_TO_MERGE:
case MERGED:
case MERGE_REVERTED:
assert transition.getRegionInfoCount() == 3 : transition;
final RegionInfo merged = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
final RegionInfo mergeA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1));
final RegionInfo mergeB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2));
updateRegionMergeTransition(serverName, transition.getTransitionCode(), merged, mergeA,
mergeB);
break;
}
}
}
public ReportRegionStateTransitionResponse reportRegionStateTransition(
final ReportRegionStateTransitionRequest req) throws PleaseHoldException {
final ReportRegionStateTransitionResponse.Builder builder =
ReportRegionStateTransitionResponse.Builder builder =
ReportRegionStateTransitionResponse.newBuilder();
final ServerName serverName = ProtobufUtil.toServerName(req.getServer());
ServerName serverName = ProtobufUtil.toServerName(req.getServer());
ServerStateNode serverNode = regionStates.getOrCreateServer(serverName);
// here we have to acquire a read lock instead of a simple exclusive lock. This is because that
// we should not block other reportRegionStateTransition call from the same region server. This
// is not only about performance, but also to prevent dead lock. Think of the meta region is
// also on the same region server and you hold the lock which blocks the
// reportRegionStateTransition for meta, and since meta is not online, you will block inside the
// lock protection to wait for meta online...
serverNode.readLock().lock();
try {
for (RegionStateTransition transition: req.getTransitionList()) {
switch (transition.getTransitionCode()) {
case OPENED:
case FAILED_OPEN:
case CLOSED:
assert transition.getRegionInfoCount() == 1 : transition;
final RegionInfo hri = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
updateRegionTransition(serverName, transition.getTransitionCode(), hri,
transition.hasOpenSeqNum() ? transition.getOpenSeqNum() : HConstants.NO_SEQNUM);
break;
case READY_TO_SPLIT:
case SPLIT:
case SPLIT_REVERTED:
assert transition.getRegionInfoCount() == 3 : transition;
final RegionInfo parent = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
final RegionInfo splitA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1));
final RegionInfo splitB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2));
updateRegionSplitTransition(serverName, transition.getTransitionCode(),
parent, splitA, splitB);
break;
case READY_TO_MERGE:
case MERGED:
case MERGE_REVERTED:
assert transition.getRegionInfoCount() == 3 : transition;
final RegionInfo merged = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
final RegionInfo mergeA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1));
final RegionInfo mergeB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2));
updateRegionMergeTransition(serverName, transition.getTransitionCode(),
merged, mergeA, mergeB);
break;
// we only accept reportRegionStateTransition if the region server is online, see the comment
// above in submitServerCrash method and HBASE-21508 for more details.
if (serverNode.isInState(ServerState.ONLINE)) {
try {
reportRegionStateTransition(builder, serverName, req.getTransitionList());
} catch (PleaseHoldException e) {
LOG.trace("Failed transition ", e);
throw e;
} catch (UnsupportedOperationException | IOException e) {
// TODO: at the moment we have a single error message and the RS will abort
// if the master says that one of the region transitions failed.
LOG.warn("Failed transition", e);
builder.setErrorMessage("Failed transition " + e.getMessage());
}
} else {
LOG.warn("The region server {} is already dead, skip reportRegionStateTransition call",
serverName);
builder.setErrorMessage("You are dead");
}
} catch (PleaseHoldException e) {
LOG.trace("Failed transition ", e);
throw e;
} catch (UnsupportedOperationException|IOException e) {
// TODO: at the moment we have a single error message and the RS will abort
// if the master says that one of the region transitions failed.
LOG.warn("Failed transition", e);
builder.setErrorMessage("Failed transition " + e.getMessage());
} finally {
serverNode.readLock().unlock();
}
return builder.build();
}
@ -1017,9 +1051,6 @@ public class AssignmentManager implements ServerListener {
}
// The Heartbeat tells us of what regions are on the region serve, check the state.
checkOnlineRegionsReport(serverNode, regionNames);
// wake report event
wakeServerReportEvent(serverNode);
}
// just check and output possible inconsistency, without actually doing anything
@ -1061,18 +1092,6 @@ public class AssignmentManager implements ServerListener {
}
}
protected boolean waitServerReportEvent(ServerName serverName, Procedure<?> proc) {
final ServerStateNode serverNode = regionStates.getOrCreateServer(serverName);
if (serverNode == null) {
LOG.warn("serverName=null; {}", proc);
}
return serverNode.getReportEvent().suspendIfNotReady(proc);
}
protected void wakeServerReportEvent(final ServerStateNode serverNode) {
serverNode.getReportEvent().wake(getProcedureScheduler());
}
// ============================================================================================
// RIT chore
// ============================================================================================
@ -1321,13 +1340,27 @@ public class AssignmentManager implements ServerListener {
return 0;
}
public long submitServerCrash(final ServerName serverName, final boolean shouldSplitWal) {
boolean carryingMeta = isCarryingMeta(serverName);
ProcedureExecutor<MasterProcedureEnv> procExec = this.master.getMasterProcedureExecutor();
long pid = procExec.submitProcedure(new ServerCrashProcedure(procExec.getEnvironment(),
serverName, shouldSplitWal, carryingMeta));
LOG.debug("Added=" + serverName
+ " to dead servers, submitted shutdown handler to be executed meta=" + carryingMeta);
public long submitServerCrash(ServerName serverName, boolean shouldSplitWal) {
boolean carryingMeta;
long pid;
ServerStateNode serverNode = regionStates.getOrCreateServer(serverName);
// we hold the write lock here for fencing on reportRegionStateTransition. Once we set the
// server state to CRASHED, we will no longer accept the reportRegionStateTransition call from
// this server. This is used to simplify the implementation for TRSP and SCP, where we can make
// sure that, the region list fetched by SCP will not be changed any more.
serverNode.writeLock().lock();
try {
serverNode.setState(ServerState.CRASHED);
carryingMeta = isCarryingMeta(serverName);
ProcedureExecutor<MasterProcedureEnv> procExec = this.master.getMasterProcedureExecutor();
pid = procExec.submitProcedure(new ServerCrashProcedure(procExec.getEnvironment(), serverName,
shouldSplitWal, carryingMeta));
} finally {
serverNode.writeLock().unlock();
}
LOG.info(
"Added {} to dead servers which carryingMeta={}, submitted ServerCrashProcedure pid={}",
serverName, carryingMeta, pid);
return pid;
}
@ -1847,22 +1880,6 @@ public class AssignmentManager implements ServerListener {
.collect(Collectors.toList());
}
// ============================================================================================
// Server Helpers
// ============================================================================================
@Override
public void serverAdded(final ServerName serverName) {
}
@Override
public void serverRemoved(final ServerName serverName) {
final ServerStateNode serverNode = regionStates.getServerNode(serverName);
if (serverNode == null) return;
// just in case, wake procedures waiting for this server report
wakeServerReportEvent(serverNode);
}
@VisibleForTesting
MasterServices getMaster() {
return master;

View File

@ -357,22 +357,6 @@ public class RegionStates {
((hri.isOffline() || hri.isSplit()) && offline);
}
/**
* Returns the set of regions hosted by the specified server
* @param serverName the server we are interested in
* @return set of RegionInfo hosted by the specified server
*/
public List<RegionInfo> getServerRegionInfoSet(final ServerName serverName) {
ServerStateNode serverInfo = getServerNode(serverName);
if (serverInfo == null) {
return Collections.emptyList();
}
synchronized (serverInfo) {
return serverInfo.getRegionInfoList();
}
}
// ============================================================================================
// Split helpers
// These methods will only be called in ServerCrashProcedure, and at the end of SCP we will remove

View File

@ -29,6 +29,11 @@ enum ServerState {
*/
ONLINE,
/**
* Indicate that the server has crashed, i.e., we have already scheduled a SCP for it.
*/
CRASHED,
/**
* Only server which carries meta can have this state. We will split wal for meta and then
* assign meta first before splitting other wals.

View File

@ -17,12 +17,15 @@
*/
package org.apache.hadoop.hbase.master.assignment;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
import org.apache.yetus.audience.InterfaceAudience;
/**
@ -31,23 +34,16 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
class ServerStateNode implements Comparable<ServerStateNode> {
private static final class ServerReportEvent extends ProcedureEvent<ServerName> {
public ServerReportEvent(final ServerName serverName) {
super(serverName);
}
}
private final ServerReportEvent reportEvent;
private final Set<RegionStateNode> regions;
private final ServerName serverName;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private volatile ServerState state = ServerState.ONLINE;
public ServerStateNode(final ServerName serverName) {
public ServerStateNode(ServerName serverName) {
this.serverName = serverName;
this.regions = ConcurrentHashMap.newKeySet();
this.reportEvent = new ServerReportEvent(serverName);
}
public ServerName getServerName() {
@ -58,10 +54,6 @@ class ServerStateNode implements Comparable<ServerStateNode> {
return state;
}
public ProcedureEvent<?> getReportEvent() {
return reportEvent;
}
public boolean isInState(final ServerState... expected) {
boolean expectedState = false;
if (expected != null) {
@ -76,20 +68,17 @@ class ServerStateNode implements Comparable<ServerStateNode> {
this.state = state;
}
public Set<RegionStateNode> getRegions() {
return regions;
}
public int getRegionCount() {
return regions.size();
}
public ArrayList<RegionInfo> getRegionInfoList() {
ArrayList<RegionInfo> hris = new ArrayList<RegionInfo>(regions.size());
for (RegionStateNode region : regions) {
hris.add(region.getRegionInfo());
}
return hris;
public List<RegionInfo> getRegionInfoList() {
return regions.stream().map(RegionStateNode::getRegionInfo).collect(Collectors.toList());
}
public List<RegionInfo> getSystemRegionInfoList() {
return regions.stream().filter(RegionStateNode::isSystemTable)
.map(RegionStateNode::getRegionInfo).collect(Collectors.toList());
}
public void addRegion(final RegionStateNode regionNode) {
@ -100,6 +89,14 @@ class ServerStateNode implements Comparable<ServerStateNode> {
this.regions.remove(regionNode);
}
public Lock readLock() {
return lock.readLock();
}
public Lock writeLock() {
return lock.writeLock();
}
@Override
public int compareTo(final ServerStateNode other) {
return getServerName().compareTo(other.getServerName());

View File

@ -500,11 +500,9 @@ public class TransitRegionStateProcedure
case REGION_STATE_TRANSITION_CONFIRM_CLOSED:
case REGION_STATE_TRANSITION_CONFIRM_OPENED:
// for these 3 states, the region may still be online on the crashed server
if (serverName.equals(regionNode.getRegionLocation())) {
env.getAssignmentManager().regionClosed(regionNode, false);
if (currentState != RegionStateTransitionState.REGION_STATE_TRANSITION_CLOSE) {
regionNode.getProcedureEvent().wake(env.getProcedureScheduler());
}
env.getAssignmentManager().regionClosed(regionNode, false);
if (currentState != RegionStateTransitionState.REGION_STATE_TRANSITION_CLOSE) {
regionNode.getProcedureEvent().wake(env.getProcedureScheduler());
}
break;
default:

View File

@ -61,52 +61,62 @@ public final class ProcedureSyncWait {
}
private static class ProcedureFuture implements Future<byte[]> {
private final ProcedureExecutor<MasterProcedureEnv> procExec;
private final Procedure<?> proc;
private final ProcedureExecutor<MasterProcedureEnv> procExec;
private final Procedure<?> proc;
private boolean hasResult = false;
private byte[] result = null;
private boolean hasResult = false;
private byte[] result = null;
public ProcedureFuture(ProcedureExecutor<MasterProcedureEnv> procExec, Procedure<?> proc) {
this.procExec = procExec;
this.proc = proc;
public ProcedureFuture(ProcedureExecutor<MasterProcedureEnv> procExec, Procedure<?> proc) {
this.procExec = procExec;
this.proc = proc;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return hasResult;
}
@Override
public byte[] get() throws InterruptedException, ExecutionException {
if (hasResult) {
return result;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) { return false; }
@Override
public boolean isCancelled() { return false; }
@Override
public boolean isDone() { return hasResult; }
@Override
public byte[] get() throws InterruptedException, ExecutionException {
if (hasResult) return result;
try {
return waitForProcedureToComplete(procExec, proc, Long.MAX_VALUE);
} catch (Exception e) {
throw new ExecutionException(e);
}
}
@Override
public byte[] get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (hasResult) return result;
try {
result = waitForProcedureToComplete(procExec, proc, unit.toMillis(timeout));
hasResult = true;
return result;
} catch (TimeoutIOException e) {
throw new TimeoutException(e.getMessage());
} catch (Exception e) {
throw new ExecutionException(e);
}
try {
return waitForProcedureToComplete(procExec, proc, Long.MAX_VALUE);
} catch (Exception e) {
throw new ExecutionException(e);
}
}
@Override
public byte[] get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (hasResult) {
return result;
}
try {
result = waitForProcedureToComplete(procExec, proc, unit.toMillis(timeout));
hasResult = true;
return result;
} catch (TimeoutIOException e) {
throw new TimeoutException(e.getMessage());
} catch (Exception e) {
throw new ExecutionException(e);
}
}
}
public static Future<byte[]> submitProcedure(final ProcedureExecutor<MasterProcedureEnv> procExec,
final Procedure<MasterProcedureEnv> proc) {
if (proc.isInitializing()) {
@ -124,9 +134,8 @@ public final class ProcedureSyncWait {
}
public static byte[] waitForProcedureToCompleteIOE(
final ProcedureExecutor<MasterProcedureEnv> procExec,
final Procedure<?> proc, final long timeout)
throws IOException {
final ProcedureExecutor<MasterProcedureEnv> procExec, final Procedure<?> proc,
final long timeout) throws IOException {
try {
return waitForProcedureToComplete(procExec, proc, timeout);
} catch (IOException e) {
@ -139,7 +148,7 @@ public final class ProcedureSyncWait {
public static byte[] waitForProcedureToComplete(
final ProcedureExecutor<MasterProcedureEnv> procExec, final Procedure<?> proc,
final long timeout) throws IOException {
waitFor(procExec.getEnvironment(), "pid=" + proc.getProcId(),
waitFor(procExec.getEnvironment(), timeout, "pid=" + proc.getProcId(),
new ProcedureSyncWait.Predicate<Boolean>() {
@Override
public Boolean evaluate() throws IOException {
@ -171,15 +180,25 @@ public final class ProcedureSyncWait {
public static <T> T waitFor(MasterProcedureEnv env, String purpose, Predicate<T> predicate)
throws IOException {
final Configuration conf = env.getMasterConfiguration();
final long waitTime = conf.getLong("hbase.master.wait.on.region", 5 * 60 * 1000);
final long waitingTimeForEvents = conf.getInt("hbase.master.event.waiting.time", 1000);
Configuration conf = env.getMasterConfiguration();
long waitTime = conf.getLong("hbase.master.wait.on.region", 5 * 60 * 1000);
return waitFor(env, waitTime, purpose, predicate);
}
public static <T> T waitFor(MasterProcedureEnv env, long waitTime, String purpose,
Predicate<T> predicate) throws IOException {
Configuration conf = env.getMasterConfiguration();
long waitingTimeForEvents = conf.getInt("hbase.master.event.waiting.time", 1000);
return waitFor(env, waitTime, waitingTimeForEvents, purpose, predicate);
}
public static <T> T waitFor(MasterProcedureEnv env, long waitTime, long waitingTimeForEvents,
String purpose, Predicate<T> predicate) throws IOException {
final long done = EnvironmentEdgeManager.currentTime() + waitTime;
long done = EnvironmentEdgeManager.currentTime() + waitTime;
if (done <= 0) {
// long overflow, usually this means we pass Long.MAX_VALUE as waitTime
done = Long.MAX_VALUE;
}
boolean logged = false;
do {
T result = predicate.evaluate();

View File

@ -146,7 +146,7 @@ public class ServerCrashProcedure
break;
case SERVER_CRASH_GET_REGIONS:
this.regionsOnCrashedServer =
services.getAssignmentManager().getRegionStates().getServerRegionInfoSet(serverName);
services.getAssignmentManager().getRegionsOnServer(serverName);
// Where to go next? Depends on whether we should split logs at all or
// if we should do distributed log splitting.
if (!this.shouldSplitWal) {

View File

@ -713,12 +713,12 @@ public class TestAdmin2 {
assertEquals(3, clusterRegionServers.size());
HashMap<ServerName, List<RegionInfo>> serversToDecommssion = new HashMap<>();
// Get a server that has regions. We will decommission two of the servers,
// Get a server that has meta online. We will decommission two of the servers,
// leaving one online.
int i;
for (i = 0; i < clusterRegionServers.size(); i++) {
List<RegionInfo> regionsOnServer = admin.getRegions(clusterRegionServers.get(i));
if (regionsOnServer.size() > 0) {
if (admin.getRegions(clusterRegionServers.get(i)).stream().anyMatch(p -> p.isMetaRegion())) {
serversToDecommssion.put(clusterRegionServers.get(i), regionsOnServer);
break;
}

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.master.assignment;
import static org.mockito.ArgumentMatchers.any;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
@ -28,7 +27,6 @@ import java.util.SortedSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.ServerMetricsBuilder;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableDescriptors;
@ -51,7 +49,6 @@ import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
@ -93,19 +90,15 @@ public class MockMasterServices extends MockNoopMasterServices {
private final ClusterConnection connection;
private final LoadBalancer balancer;
private final ServerManager serverManager;
// Set of regions on a 'server'. Populated externally. Used in below faking 'cluster'.
private final NavigableMap<ServerName, SortedSet<byte []>> regionsToRegionServers;
private final ProcedureEvent initialized = new ProcedureEvent("master initialized");
private final ProcedureEvent<?> initialized = new ProcedureEvent<>("master initialized");
public static final String DEFAULT_COLUMN_FAMILY_NAME = "cf";
public static final ServerName MOCK_MASTER_SERVERNAME =
ServerName.valueOf("mockmaster.example.org", 1234, -1L);
public MockMasterServices(Configuration conf,
NavigableMap<ServerName, SortedSet<byte []>> regionsToRegionServers)
throws IOException {
NavigableMap<ServerName, SortedSet<byte[]>> regionsToRegionServers) throws IOException {
super(conf);
this.regionsToRegionServers = regionsToRegionServers;
Superusers.initialize(conf);
this.fileSystemManager = new MasterFileSystem(conf);
this.walManager = new MasterWalManager(this);
@ -120,15 +113,6 @@ public class MockMasterServices extends MockNoopMasterServices {
public boolean isTableDisabled(final TableName tableName) {
return false;
}
@Override
protected boolean waitServerReportEvent(ServerName serverName, Procedure proc) {
// Make a report with current state of the server 'serverName' before we call wait..
SortedSet<byte[]> regions = regionsToRegionServers.get(serverName);
getAssignmentManager().reportOnlineRegions(serverName,
regions == null ? new HashSet<byte[]>() : regions);
return super.waitServerReportEvent(serverName, proc);
}
};
this.balancer = LoadBalancerFactory.getLoadBalancer(conf);
this.serverManager = new ServerManager(this);
@ -176,7 +160,7 @@ public class MockMasterServices extends MockNoopMasterServices {
this.assignmentManager.start();
for (int i = 0; i < numServes; ++i) {
ServerName sn = ServerName.valueOf("localhost", 100 + i, 1);
serverManager.regionServerReport(sn, new ServerLoad(ServerMetricsBuilder.of(sn)));
serverManager.regionServerReport(sn, ServerMetricsBuilder.of(sn));
}
this.procedureExecutor.getEnvironment().setEventReady(initialized, true);
}
@ -202,7 +186,7 @@ public class MockMasterServices extends MockNoopMasterServices {
return;
}
ServerName sn = ServerName.valueOf(serverName.getAddress().toString(), startCode);
serverManager.regionServerReport(sn, new ServerLoad(ServerMetricsBuilder.of(sn)));
serverManager.regionServerReport(sn, ServerMetricsBuilder.of(sn));
}
@Override
@ -260,7 +244,7 @@ public class MockMasterServices extends MockNoopMasterServices {
}
@Override
public ProcedureEvent getInitializedEvent() {
public ProcedureEvent<?> getInitializedEvent() {
return this.initialized;
}

View File

@ -27,7 +27,6 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
@ -49,14 +48,6 @@ public class TestAssignmentManager extends TestAssignmentManagerBase {
private static final Logger LOG = LoggerFactory.getLogger(TestAssignmentManager.class);
@Test(expected = NullPointerException.class)
public void testWaitServerReportEventWithNullServer() throws UnexpectedStateException {
// Test what happens if we pass in null server. I'd expect it throws NPE.
if (this.am.waitServerReportEvent(null, null)) {
throw new UnexpectedStateException();
}
}
@Test
public void testAssignWithGoodExec() throws Exception {
// collect AM metrics before test

View File

@ -186,7 +186,7 @@ public abstract class TestAssignmentManagerBase {
protected byte[] waitOnFuture(final Future<byte[]> future) throws Exception {
try {
return future.get(5, TimeUnit.SECONDS);
return future.get(60, TimeUnit.SECONDS);
} catch (ExecutionException e) {
LOG.info("ExecutionException", e);
Exception ee = (Exception) e.getCause();

View File

@ -0,0 +1,201 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.assignment;
import static org.junit.Assert.assertNotEquals;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.PleaseHoldException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.zookeeper.KeeperException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
@Category({ MasterTests.class, MediumTests.class })
public class TestReportRegionStateTransitionFromDeadServer {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestReportRegionStateTransitionFromDeadServer.class);
private static final List<ServerName> EXCLUDE_SERVERS = new ArrayList<>();
private static CountDownLatch ARRIVE_GET_REGIONS;
private static CountDownLatch RESUME_GET_REGIONS;
private static CountDownLatch ARRIVE_REPORT;
private static CountDownLatch RESUME_REPORT;
private static final class ServerManagerForTest extends ServerManager {
public ServerManagerForTest(MasterServices master) {
super(master);
}
@Override
public List<ServerName> createDestinationServersList() {
return super.createDestinationServersList(EXCLUDE_SERVERS);
}
}
private static final class AssignmentManagerForTest extends AssignmentManager {
public AssignmentManagerForTest(MasterServices master) {
super(master);
}
@Override
public List<RegionInfo> getRegionsOnServer(ServerName serverName) {
List<RegionInfo> regions = super.getRegionsOnServer(serverName);
if (ARRIVE_GET_REGIONS != null) {
ARRIVE_GET_REGIONS.countDown();
try {
RESUME_GET_REGIONS.await();
} catch (InterruptedException e) {
}
}
return regions;
}
@Override
public ReportRegionStateTransitionResponse reportRegionStateTransition(
ReportRegionStateTransitionRequest req) throws PleaseHoldException {
if (ARRIVE_REPORT != null && req.getTransitionList().stream()
.allMatch(t -> !ProtobufUtil.toRegionInfo(t.getRegionInfo(0)).isMetaRegion())) {
ARRIVE_REPORT.countDown();
try {
RESUME_REPORT.await();
} catch (InterruptedException e) {
}
}
return super.reportRegionStateTransition(req);
}
}
public static final class HMasterForTest extends HMaster {
public HMasterForTest(Configuration conf) throws IOException, KeeperException {
super(conf);
}
@Override
protected AssignmentManager createAssignmentManager(MasterServices master) {
return new AssignmentManagerForTest(master);
}
@Override
protected ServerManager createServerManager(MasterServices master) throws IOException {
setupClusterConnection();
return new ServerManagerForTest(master);
}
}
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static TableName NAME = TableName.valueOf("Report");
private static byte[] CF = Bytes.toBytes("cf");
@BeforeClass
public static void setUp() throws Exception {
UTIL.getConfiguration().setClass(HConstants.MASTER_IMPL, HMasterForTest.class, HMaster.class);
UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 1000);
UTIL.startMiniCluster(3);
UTIL.getAdmin().balancerSwitch(false, true);
UTIL.createTable(NAME, CF);
UTIL.waitTableAvailable(NAME);
}
@AfterClass
public static void tearDown() throws Exception {
UTIL.shutdownMiniCluster();
}
@Test
public void test() throws HBaseIOException, InterruptedException, ExecutionException {
RegionInfo region = UTIL.getMiniHBaseCluster().getRegions(NAME).get(0).getRegionInfo();
AssignmentManager am = UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager();
RegionStateNode rsn = am.getRegionStates().getRegionStateNode(region);
// move from rs0 to rs1, and then kill rs0. Later add rs1 to exclude servers, and at last verify
// that the region should not be on rs1 and rs2 both.
HRegionServer rs0 = UTIL.getMiniHBaseCluster().getRegionServer(rsn.getRegionLocation());
HRegionServer rs1 = UTIL.getOtherRegionServer(rs0);
HRegionServer rs2 = UTIL.getMiniHBaseCluster().getRegionServerThreads().stream()
.map(t -> t.getRegionServer()).filter(rs -> rs != rs0 && rs != rs1).findAny().get();
RESUME_REPORT = new CountDownLatch(1);
ARRIVE_REPORT = new CountDownLatch(1);
Future<?> future =
am.moveAsync(new RegionPlan(region, rs0.getServerName(), rs1.getServerName()));
ARRIVE_REPORT.await();
RESUME_GET_REGIONS = new CountDownLatch(1);
ARRIVE_GET_REGIONS = new CountDownLatch(1);
rs0.abort("For testing!");
ARRIVE_GET_REGIONS.await();
RESUME_REPORT.countDown();
try {
future.get(15, TimeUnit.SECONDS);
} catch (TimeoutException e) {
// after the fix in HBASE-21508 we will get this exception as the TRSP can not be finished any
// more before SCP interrupts it. It's OK.
}
EXCLUDE_SERVERS.add(rs1.getServerName());
RESUME_GET_REGIONS.countDown();
// wait until there are no running procedures, no SCP and no TRSP
UTIL.waitFor(30000, () -> UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor()
.getActiveProcIds().isEmpty());
boolean onRS1 = !rs1.getRegions(NAME).isEmpty();
boolean onRS2 = !rs2.getRegions(NAME).isEmpty();
assertNotEquals(
"should either be on rs1 or rs2, but onRS1 is " + onRS1 + " and on RS2 is " + onRS2, onRS1,
onRS2);
}
}