From 33c4da5558752b70d223528a35648c53fbbd10b1 Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Mon, 26 Nov 2018 11:28:08 +0800 Subject: [PATCH] HBASE-21508 Ignore the reportRegionStateTransition call from a dead server Signed-off-by: Guanghao Zhang --- .../apache/hadoop/hbase/master/HMaster.java | 3 +- .../master/assignment/AssignmentManager.java | 205 ++++++++++-------- .../hbase/master/assignment/RegionStates.java | 16 -- .../hbase/master/assignment/ServerState.java | 5 + .../master/assignment/ServerStateNode.java | 49 ++--- .../TransitRegionStateProcedure.java | 8 +- .../master/procedure/ProcedureSyncWait.java | 115 ++++++---- .../procedure/ServerCrashProcedure.java | 2 +- .../hadoop/hbase/client/TestAdmin2.java | 4 +- .../master/assignment/MockMasterServices.java | 26 +-- .../assignment/TestAssignmentManager.java | 9 - .../assignment/TestAssignmentManagerBase.java | 2 +- ...rtRegionStateTransitionFromDeadServer.java | 201 +++++++++++++++++ 13 files changed, 420 insertions(+), 225 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestReportRegionStateTransitionFromDeadServer.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 995cb5d638f..f9b100c0a50 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -3774,8 +3774,7 @@ public class HMaster extends HRegionServer implements MasterServices { if (offload) { final List destServers = this.serverManager.createDestinationServersList(); for (ServerName server : serversAdded) { - final List regionsOnServer = - this.assignmentManager.getRegionStates().getServerRegionInfoSet(server); + final List regionsOnServer = this.assignmentManager.getRegionsOnServer(server); for (RegionInfo hri : regionsOnServer) { ServerName dest = balancer.randomAssignment(hri, destServers); if (dest == null) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java index 37e5f0c93e1..a564ea925a6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java @@ -53,7 +53,6 @@ import org.apache.hadoop.hbase.master.MetricsAssignmentManager; import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.master.RegionState; import org.apache.hadoop.hbase.master.RegionState.State; -import org.apache.hadoop.hbase.master.ServerListener; import org.apache.hadoop.hbase.master.TableStateManager; import org.apache.hadoop.hbase.master.balancer.FavoredStochasticBalancer; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; @@ -99,7 +98,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto * Unassigns are triggered by DisableTable, Split, Merge */ @InterfaceAudience.Private -public class AssignmentManager implements ServerListener { +public class AssignmentManager { private static final Logger LOG = LoggerFactory.getLogger(AssignmentManager.class); // TODO: AMv2 @@ -193,9 +192,6 @@ public class AssignmentManager implements ServerListener { LOG.trace("Starting assignment manager"); - // Register Server Listener - master.getServerManager().registerListener(this); - // Start the Assignment Thread startAssignmentThread(); @@ -275,9 +271,6 @@ public class AssignmentManager implements ServerListener { // Stop the RegionStateStore regionStates.clear(); - // Unregister Server Listener - master.getServerManager().unregisterListener(this); - // Update meta events (for testing) if (hasProcExecutor) { metaLoadEvent.suspend(); @@ -319,14 +312,31 @@ public class AssignmentManager implements ServerListener { return regionStates; } + /** + * Returns the regions hosted by the specified server. + *

+ * Notice that, for SCP, after we submit the SCP, no one can change the region list for the + * ServerStateNode so we do not need any locks here. And for other usage, this can only give you a + * snapshot of the current region list for this server, which means, right after you get the + * region list, new regions may be moved to this server or some regions may be moved out from this + * server, so you should not use it critically if you need strong consistency. + */ + public List getRegionsOnServer(ServerName serverName) { + ServerStateNode serverInfo = regionStates.getServerNode(serverName); + if (serverInfo == null) { + return Collections.emptyList(); + } + return serverInfo.getRegionInfoList(); + } + public RegionStateStore getRegionStateStore() { return regionStateStore; } public List getFavoredNodes(final RegionInfo regionInfo) { - return this.shouldAssignRegionsWithFavoredNodes? - ((FavoredStochasticBalancer)getBalancer()).getFavoredNodes(regionInfo): - ServerName.EMPTY_SERVER_LIST; + return this.shouldAssignRegionsWithFavoredNodes + ? ((FavoredStochasticBalancer) getBalancer()).getFavoredNodes(regionInfo) + : ServerName.EMPTY_SERVER_LIST; } // ============================================================================================ @@ -522,12 +532,11 @@ public class AssignmentManager implements ServerListener { } private List getSystemTables(ServerName serverName) { - Set regions = this.getRegionStates().getServerNode(serverName).getRegions(); - if (regions == null) { + ServerStateNode serverNode = regionStates.getServerNode(serverName); + if (serverNode == null) { return Collections.emptyList(); } - return regions.stream().map(RegionStateNode::getRegionInfo) - .filter(r -> r.getTable().isSystemTable()).collect(Collectors.toList()); + return serverNode.getSystemRegionInfoList(); } private void preTransitCheck(RegionStateNode regionNode, RegionState.State[] expectedStates) @@ -817,54 +826,79 @@ public class AssignmentManager implements ServerListener { // ============================================================================================ // RS Region Transition Report helpers // ============================================================================================ - // TODO: Move this code in MasterRpcServices and call on specific event? + private void reportRegionStateTransition(ReportRegionStateTransitionResponse.Builder builder, + ServerName serverName, List transitionList) throws IOException { + for (RegionStateTransition transition : transitionList) { + switch (transition.getTransitionCode()) { + case OPENED: + case FAILED_OPEN: + case CLOSED: + assert transition.getRegionInfoCount() == 1 : transition; + final RegionInfo hri = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0)); + updateRegionTransition(serverName, transition.getTransitionCode(), hri, + transition.hasOpenSeqNum() ? transition.getOpenSeqNum() : HConstants.NO_SEQNUM); + break; + case READY_TO_SPLIT: + case SPLIT: + case SPLIT_REVERTED: + assert transition.getRegionInfoCount() == 3 : transition; + final RegionInfo parent = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0)); + final RegionInfo splitA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1)); + final RegionInfo splitB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2)); + updateRegionSplitTransition(serverName, transition.getTransitionCode(), parent, splitA, + splitB); + break; + case READY_TO_MERGE: + case MERGED: + case MERGE_REVERTED: + assert transition.getRegionInfoCount() == 3 : transition; + final RegionInfo merged = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0)); + final RegionInfo mergeA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1)); + final RegionInfo mergeB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2)); + updateRegionMergeTransition(serverName, transition.getTransitionCode(), merged, mergeA, + mergeB); + break; + } + } + } + public ReportRegionStateTransitionResponse reportRegionStateTransition( final ReportRegionStateTransitionRequest req) throws PleaseHoldException { - final ReportRegionStateTransitionResponse.Builder builder = + ReportRegionStateTransitionResponse.Builder builder = ReportRegionStateTransitionResponse.newBuilder(); - final ServerName serverName = ProtobufUtil.toServerName(req.getServer()); + ServerName serverName = ProtobufUtil.toServerName(req.getServer()); + ServerStateNode serverNode = regionStates.getOrCreateServer(serverName); + // here we have to acquire a read lock instead of a simple exclusive lock. This is because that + // we should not block other reportRegionStateTransition call from the same region server. This + // is not only about performance, but also to prevent dead lock. Think of the meta region is + // also on the same region server and you hold the lock which blocks the + // reportRegionStateTransition for meta, and since meta is not online, you will block inside the + // lock protection to wait for meta online... + serverNode.readLock().lock(); try { - for (RegionStateTransition transition: req.getTransitionList()) { - switch (transition.getTransitionCode()) { - case OPENED: - case FAILED_OPEN: - case CLOSED: - assert transition.getRegionInfoCount() == 1 : transition; - final RegionInfo hri = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0)); - updateRegionTransition(serverName, transition.getTransitionCode(), hri, - transition.hasOpenSeqNum() ? transition.getOpenSeqNum() : HConstants.NO_SEQNUM); - break; - case READY_TO_SPLIT: - case SPLIT: - case SPLIT_REVERTED: - assert transition.getRegionInfoCount() == 3 : transition; - final RegionInfo parent = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0)); - final RegionInfo splitA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1)); - final RegionInfo splitB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2)); - updateRegionSplitTransition(serverName, transition.getTransitionCode(), - parent, splitA, splitB); - break; - case READY_TO_MERGE: - case MERGED: - case MERGE_REVERTED: - assert transition.getRegionInfoCount() == 3 : transition; - final RegionInfo merged = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0)); - final RegionInfo mergeA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1)); - final RegionInfo mergeB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2)); - updateRegionMergeTransition(serverName, transition.getTransitionCode(), - merged, mergeA, mergeB); - break; + // we only accept reportRegionStateTransition if the region server is online, see the comment + // above in submitServerCrash method and HBASE-21508 for more details. + if (serverNode.isInState(ServerState.ONLINE)) { + try { + reportRegionStateTransition(builder, serverName, req.getTransitionList()); + } catch (PleaseHoldException e) { + LOG.trace("Failed transition ", e); + throw e; + } catch (UnsupportedOperationException | IOException e) { + // TODO: at the moment we have a single error message and the RS will abort + // if the master says that one of the region transitions failed. + LOG.warn("Failed transition", e); + builder.setErrorMessage("Failed transition " + e.getMessage()); } + } else { + LOG.warn("The region server {} is already dead, skip reportRegionStateTransition call", + serverName); + builder.setErrorMessage("You are dead"); } - } catch (PleaseHoldException e) { - LOG.trace("Failed transition ", e); - throw e; - } catch (UnsupportedOperationException|IOException e) { - // TODO: at the moment we have a single error message and the RS will abort - // if the master says that one of the region transitions failed. - LOG.warn("Failed transition", e); - builder.setErrorMessage("Failed transition " + e.getMessage()); + } finally { + serverNode.readLock().unlock(); } + return builder.build(); } @@ -1017,9 +1051,6 @@ public class AssignmentManager implements ServerListener { } // The Heartbeat tells us of what regions are on the region serve, check the state. checkOnlineRegionsReport(serverNode, regionNames); - - // wake report event - wakeServerReportEvent(serverNode); } // just check and output possible inconsistency, without actually doing anything @@ -1061,18 +1092,6 @@ public class AssignmentManager implements ServerListener { } } - protected boolean waitServerReportEvent(ServerName serverName, Procedure proc) { - final ServerStateNode serverNode = regionStates.getOrCreateServer(serverName); - if (serverNode == null) { - LOG.warn("serverName=null; {}", proc); - } - return serverNode.getReportEvent().suspendIfNotReady(proc); - } - - protected void wakeServerReportEvent(final ServerStateNode serverNode) { - serverNode.getReportEvent().wake(getProcedureScheduler()); - } - // ============================================================================================ // RIT chore // ============================================================================================ @@ -1321,13 +1340,27 @@ public class AssignmentManager implements ServerListener { return 0; } - public long submitServerCrash(final ServerName serverName, final boolean shouldSplitWal) { - boolean carryingMeta = isCarryingMeta(serverName); - ProcedureExecutor procExec = this.master.getMasterProcedureExecutor(); - long pid = procExec.submitProcedure(new ServerCrashProcedure(procExec.getEnvironment(), - serverName, shouldSplitWal, carryingMeta)); - LOG.debug("Added=" + serverName - + " to dead servers, submitted shutdown handler to be executed meta=" + carryingMeta); + public long submitServerCrash(ServerName serverName, boolean shouldSplitWal) { + boolean carryingMeta; + long pid; + ServerStateNode serverNode = regionStates.getOrCreateServer(serverName); + // we hold the write lock here for fencing on reportRegionStateTransition. Once we set the + // server state to CRASHED, we will no longer accept the reportRegionStateTransition call from + // this server. This is used to simplify the implementation for TRSP and SCP, where we can make + // sure that, the region list fetched by SCP will not be changed any more. + serverNode.writeLock().lock(); + try { + serverNode.setState(ServerState.CRASHED); + carryingMeta = isCarryingMeta(serverName); + ProcedureExecutor procExec = this.master.getMasterProcedureExecutor(); + pid = procExec.submitProcedure(new ServerCrashProcedure(procExec.getEnvironment(), serverName, + shouldSplitWal, carryingMeta)); + } finally { + serverNode.writeLock().unlock(); + } + LOG.info( + "Added {} to dead servers which carryingMeta={}, submitted ServerCrashProcedure pid={}", + serverName, carryingMeta, pid); return pid; } @@ -1847,22 +1880,6 @@ public class AssignmentManager implements ServerListener { .collect(Collectors.toList()); } - // ============================================================================================ - // Server Helpers - // ============================================================================================ - @Override - public void serverAdded(final ServerName serverName) { - } - - @Override - public void serverRemoved(final ServerName serverName) { - final ServerStateNode serverNode = regionStates.getServerNode(serverName); - if (serverNode == null) return; - - // just in case, wake procedures waiting for this server report - wakeServerReportEvent(serverNode); - } - @VisibleForTesting MasterServices getMaster() { return master; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java index 2b9c0bdccc3..7b854095e3c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java @@ -357,22 +357,6 @@ public class RegionStates { ((hri.isOffline() || hri.isSplit()) && offline); } - /** - * Returns the set of regions hosted by the specified server - * @param serverName the server we are interested in - * @return set of RegionInfo hosted by the specified server - */ - public List getServerRegionInfoSet(final ServerName serverName) { - ServerStateNode serverInfo = getServerNode(serverName); - if (serverInfo == null) { - return Collections.emptyList(); - } - - synchronized (serverInfo) { - return serverInfo.getRegionInfoList(); - } - } - // ============================================================================================ // Split helpers // These methods will only be called in ServerCrashProcedure, and at the end of SCP we will remove diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/ServerState.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/ServerState.java index 6925c42307e..3efe6e22662 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/ServerState.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/ServerState.java @@ -29,6 +29,11 @@ enum ServerState { */ ONLINE, + /** + * Indicate that the server has crashed, i.e., we have already scheduled a SCP for it. + */ + CRASHED, + /** * Only server which carries meta can have this state. We will split wal for meta and then * assign meta first before splitting other wals. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/ServerStateNode.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/ServerStateNode.java index 204221427e7..6f763aad7c2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/ServerStateNode.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/ServerStateNode.java @@ -17,12 +17,15 @@ */ package org.apache.hadoop.hbase.master.assignment; -import java.util.ArrayList; +import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.RegionInfo; -import org.apache.hadoop.hbase.procedure2.ProcedureEvent; import org.apache.yetus.audience.InterfaceAudience; /** @@ -31,23 +34,16 @@ import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private class ServerStateNode implements Comparable { - private static final class ServerReportEvent extends ProcedureEvent { - public ServerReportEvent(final ServerName serverName) { - super(serverName); - } - } - - private final ServerReportEvent reportEvent; - private final Set regions; private final ServerName serverName; + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + private volatile ServerState state = ServerState.ONLINE; - public ServerStateNode(final ServerName serverName) { + public ServerStateNode(ServerName serverName) { this.serverName = serverName; this.regions = ConcurrentHashMap.newKeySet(); - this.reportEvent = new ServerReportEvent(serverName); } public ServerName getServerName() { @@ -58,10 +54,6 @@ class ServerStateNode implements Comparable { return state; } - public ProcedureEvent getReportEvent() { - return reportEvent; - } - public boolean isInState(final ServerState... expected) { boolean expectedState = false; if (expected != null) { @@ -76,20 +68,17 @@ class ServerStateNode implements Comparable { this.state = state; } - public Set getRegions() { - return regions; - } - public int getRegionCount() { return regions.size(); } - public ArrayList getRegionInfoList() { - ArrayList hris = new ArrayList(regions.size()); - for (RegionStateNode region : regions) { - hris.add(region.getRegionInfo()); - } - return hris; + public List getRegionInfoList() { + return regions.stream().map(RegionStateNode::getRegionInfo).collect(Collectors.toList()); + } + + public List getSystemRegionInfoList() { + return regions.stream().filter(RegionStateNode::isSystemTable) + .map(RegionStateNode::getRegionInfo).collect(Collectors.toList()); } public void addRegion(final RegionStateNode regionNode) { @@ -100,6 +89,14 @@ class ServerStateNode implements Comparable { this.regions.remove(regionNode); } + public Lock readLock() { + return lock.readLock(); + } + + public Lock writeLock() { + return lock.writeLock(); + } + @Override public int compareTo(final ServerStateNode other) { return getServerName().compareTo(other.getServerName()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/TransitRegionStateProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/TransitRegionStateProcedure.java index 90ebf7b1a59..0885a6a9a8b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/TransitRegionStateProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/TransitRegionStateProcedure.java @@ -500,11 +500,9 @@ public class TransitRegionStateProcedure case REGION_STATE_TRANSITION_CONFIRM_CLOSED: case REGION_STATE_TRANSITION_CONFIRM_OPENED: // for these 3 states, the region may still be online on the crashed server - if (serverName.equals(regionNode.getRegionLocation())) { - env.getAssignmentManager().regionClosed(regionNode, false); - if (currentState != RegionStateTransitionState.REGION_STATE_TRANSITION_CLOSE) { - regionNode.getProcedureEvent().wake(env.getProcedureScheduler()); - } + env.getAssignmentManager().regionClosed(regionNode, false); + if (currentState != RegionStateTransitionState.REGION_STATE_TRANSITION_CLOSE) { + regionNode.getProcedureEvent().wake(env.getProcedureScheduler()); } break; default: diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java index c8ff9f80452..9353124f92d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java @@ -61,52 +61,62 @@ public final class ProcedureSyncWait { } private static class ProcedureFuture implements Future { - private final ProcedureExecutor procExec; - private final Procedure proc; + private final ProcedureExecutor procExec; + private final Procedure proc; - private boolean hasResult = false; - private byte[] result = null; + private boolean hasResult = false; + private byte[] result = null; - public ProcedureFuture(ProcedureExecutor procExec, Procedure proc) { - this.procExec = procExec; - this.proc = proc; + public ProcedureFuture(ProcedureExecutor procExec, Procedure proc) { + this.procExec = procExec; + this.proc = proc; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return false; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean isDone() { + return hasResult; + } + + @Override + public byte[] get() throws InterruptedException, ExecutionException { + if (hasResult) { + return result; } - - @Override - public boolean cancel(boolean mayInterruptIfRunning) { return false; } - - @Override - public boolean isCancelled() { return false; } - - @Override - public boolean isDone() { return hasResult; } - - @Override - public byte[] get() throws InterruptedException, ExecutionException { - if (hasResult) return result; - try { - return waitForProcedureToComplete(procExec, proc, Long.MAX_VALUE); - } catch (Exception e) { - throw new ExecutionException(e); - } - } - - @Override - public byte[] get(long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - if (hasResult) return result; - try { - result = waitForProcedureToComplete(procExec, proc, unit.toMillis(timeout)); - hasResult = true; - return result; - } catch (TimeoutIOException e) { - throw new TimeoutException(e.getMessage()); - } catch (Exception e) { - throw new ExecutionException(e); - } + try { + return waitForProcedureToComplete(procExec, proc, Long.MAX_VALUE); + } catch (Exception e) { + throw new ExecutionException(e); } } + @Override + public byte[] get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + if (hasResult) { + return result; + } + try { + result = waitForProcedureToComplete(procExec, proc, unit.toMillis(timeout)); + hasResult = true; + return result; + } catch (TimeoutIOException e) { + throw new TimeoutException(e.getMessage()); + } catch (Exception e) { + throw new ExecutionException(e); + } + } + } + public static Future submitProcedure(final ProcedureExecutor procExec, final Procedure proc) { if (proc.isInitializing()) { @@ -124,9 +134,8 @@ public final class ProcedureSyncWait { } public static byte[] waitForProcedureToCompleteIOE( - final ProcedureExecutor procExec, - final Procedure proc, final long timeout) - throws IOException { + final ProcedureExecutor procExec, final Procedure proc, + final long timeout) throws IOException { try { return waitForProcedureToComplete(procExec, proc, timeout); } catch (IOException e) { @@ -139,7 +148,7 @@ public final class ProcedureSyncWait { public static byte[] waitForProcedureToComplete( final ProcedureExecutor procExec, final Procedure proc, final long timeout) throws IOException { - waitFor(procExec.getEnvironment(), "pid=" + proc.getProcId(), + waitFor(procExec.getEnvironment(), timeout, "pid=" + proc.getProcId(), new ProcedureSyncWait.Predicate() { @Override public Boolean evaluate() throws IOException { @@ -171,15 +180,25 @@ public final class ProcedureSyncWait { public static T waitFor(MasterProcedureEnv env, String purpose, Predicate predicate) throws IOException { - final Configuration conf = env.getMasterConfiguration(); - final long waitTime = conf.getLong("hbase.master.wait.on.region", 5 * 60 * 1000); - final long waitingTimeForEvents = conf.getInt("hbase.master.event.waiting.time", 1000); + Configuration conf = env.getMasterConfiguration(); + long waitTime = conf.getLong("hbase.master.wait.on.region", 5 * 60 * 1000); + return waitFor(env, waitTime, purpose, predicate); + } + + public static T waitFor(MasterProcedureEnv env, long waitTime, String purpose, + Predicate predicate) throws IOException { + Configuration conf = env.getMasterConfiguration(); + long waitingTimeForEvents = conf.getInt("hbase.master.event.waiting.time", 1000); return waitFor(env, waitTime, waitingTimeForEvents, purpose, predicate); } public static T waitFor(MasterProcedureEnv env, long waitTime, long waitingTimeForEvents, String purpose, Predicate predicate) throws IOException { - final long done = EnvironmentEdgeManager.currentTime() + waitTime; + long done = EnvironmentEdgeManager.currentTime() + waitTime; + if (done <= 0) { + // long overflow, usually this means we pass Long.MAX_VALUE as waitTime + done = Long.MAX_VALUE; + } boolean logged = false; do { T result = predicate.evaluate(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java index 1fcc6eb6b97..048bca80023 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java @@ -146,7 +146,7 @@ public class ServerCrashProcedure break; case SERVER_CRASH_GET_REGIONS: this.regionsOnCrashedServer = - services.getAssignmentManager().getRegionStates().getServerRegionInfoSet(serverName); + services.getAssignmentManager().getRegionsOnServer(serverName); // Where to go next? Depends on whether we should split logs at all or // if we should do distributed log splitting. if (!this.shouldSplitWal) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java index 4ab1a8fbee0..97c7b9b5798 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java @@ -713,12 +713,12 @@ public class TestAdmin2 { assertEquals(3, clusterRegionServers.size()); HashMap> serversToDecommssion = new HashMap<>(); - // Get a server that has regions. We will decommission two of the servers, + // Get a server that has meta online. We will decommission two of the servers, // leaving one online. int i; for (i = 0; i < clusterRegionServers.size(); i++) { List regionsOnServer = admin.getRegions(clusterRegionServers.get(i)); - if (regionsOnServer.size() > 0) { + if (admin.getRegions(clusterRegionServers.get(i)).stream().anyMatch(p -> p.isMetaRegion())) { serversToDecommssion.put(clusterRegionServers.get(i), regionsOnServer); break; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java index 5a1f87d6f33..56467cc6d6c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.master.assignment; import static org.mockito.ArgumentMatchers.any; import java.io.IOException; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -28,7 +27,6 @@ import java.util.SortedSet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.CoordinatedStateManager; -import org.apache.hadoop.hbase.ServerLoad; import org.apache.hadoop.hbase.ServerMetricsBuilder; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableDescriptors; @@ -51,7 +49,6 @@ import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory; import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher; -import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureEvent; import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; @@ -93,19 +90,15 @@ public class MockMasterServices extends MockNoopMasterServices { private final ClusterConnection connection; private final LoadBalancer balancer; private final ServerManager serverManager; - // Set of regions on a 'server'. Populated externally. Used in below faking 'cluster'. - private final NavigableMap> regionsToRegionServers; - private final ProcedureEvent initialized = new ProcedureEvent("master initialized"); + private final ProcedureEvent initialized = new ProcedureEvent<>("master initialized"); public static final String DEFAULT_COLUMN_FAMILY_NAME = "cf"; public static final ServerName MOCK_MASTER_SERVERNAME = ServerName.valueOf("mockmaster.example.org", 1234, -1L); public MockMasterServices(Configuration conf, - NavigableMap> regionsToRegionServers) - throws IOException { + NavigableMap> regionsToRegionServers) throws IOException { super(conf); - this.regionsToRegionServers = regionsToRegionServers; Superusers.initialize(conf); this.fileSystemManager = new MasterFileSystem(conf); this.walManager = new MasterWalManager(this); @@ -120,15 +113,6 @@ public class MockMasterServices extends MockNoopMasterServices { public boolean isTableDisabled(final TableName tableName) { return false; } - - @Override - protected boolean waitServerReportEvent(ServerName serverName, Procedure proc) { - // Make a report with current state of the server 'serverName' before we call wait.. - SortedSet regions = regionsToRegionServers.get(serverName); - getAssignmentManager().reportOnlineRegions(serverName, - regions == null ? new HashSet() : regions); - return super.waitServerReportEvent(serverName, proc); - } }; this.balancer = LoadBalancerFactory.getLoadBalancer(conf); this.serverManager = new ServerManager(this); @@ -176,7 +160,7 @@ public class MockMasterServices extends MockNoopMasterServices { this.assignmentManager.start(); for (int i = 0; i < numServes; ++i) { ServerName sn = ServerName.valueOf("localhost", 100 + i, 1); - serverManager.regionServerReport(sn, new ServerLoad(ServerMetricsBuilder.of(sn))); + serverManager.regionServerReport(sn, ServerMetricsBuilder.of(sn)); } this.procedureExecutor.getEnvironment().setEventReady(initialized, true); } @@ -202,7 +186,7 @@ public class MockMasterServices extends MockNoopMasterServices { return; } ServerName sn = ServerName.valueOf(serverName.getAddress().toString(), startCode); - serverManager.regionServerReport(sn, new ServerLoad(ServerMetricsBuilder.of(sn))); + serverManager.regionServerReport(sn, ServerMetricsBuilder.of(sn)); } @Override @@ -260,7 +244,7 @@ public class MockMasterServices extends MockNoopMasterServices { } @Override - public ProcedureEvent getInitializedEvent() { + public ProcedureEvent getInitializedEvent() { return this.initialized; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java index 94963a04cef..5ec7cc64e4e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java @@ -27,7 +27,6 @@ import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfoBuilder; -import org.apache.hadoop.hbase.exceptions.UnexpectedStateException; import org.apache.hadoop.hbase.master.RegionState.State; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; import org.apache.hadoop.hbase.procedure2.util.StringUtils; @@ -49,14 +48,6 @@ public class TestAssignmentManager extends TestAssignmentManagerBase { private static final Logger LOG = LoggerFactory.getLogger(TestAssignmentManager.class); - @Test(expected = NullPointerException.class) - public void testWaitServerReportEventWithNullServer() throws UnexpectedStateException { - // Test what happens if we pass in null server. I'd expect it throws NPE. - if (this.am.waitServerReportEvent(null, null)) { - throw new UnexpectedStateException(); - } - } - @Test public void testAssignWithGoodExec() throws Exception { // collect AM metrics before test diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManagerBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManagerBase.java index 5f5a5766d28..7ab37bc4214 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManagerBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManagerBase.java @@ -186,7 +186,7 @@ public abstract class TestAssignmentManagerBase { protected byte[] waitOnFuture(final Future future) throws Exception { try { - return future.get(5, TimeUnit.SECONDS); + return future.get(60, TimeUnit.SECONDS); } catch (ExecutionException e) { LOG.info("ExecutionException", e); Exception ee = (Exception) e.getCause(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestReportRegionStateTransitionFromDeadServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestReportRegionStateTransitionFromDeadServer.java new file mode 100644 index 00000000000..6c9e5eb34ba --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestReportRegionStateTransitionFromDeadServer.java @@ -0,0 +1,201 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.assignment; + +import static org.junit.Assert.assertNotEquals; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.PleaseHoldException; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.RegionPlan; +import org.apache.hadoop.hbase.master.ServerManager; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.zookeeper.KeeperException; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse; + +@Category({ MasterTests.class, MediumTests.class }) +public class TestReportRegionStateTransitionFromDeadServer { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestReportRegionStateTransitionFromDeadServer.class); + + private static final List EXCLUDE_SERVERS = new ArrayList<>(); + + private static CountDownLatch ARRIVE_GET_REGIONS; + private static CountDownLatch RESUME_GET_REGIONS; + private static CountDownLatch ARRIVE_REPORT; + private static CountDownLatch RESUME_REPORT; + + private static final class ServerManagerForTest extends ServerManager { + + public ServerManagerForTest(MasterServices master) { + super(master); + } + + @Override + public List createDestinationServersList() { + return super.createDestinationServersList(EXCLUDE_SERVERS); + } + } + + private static final class AssignmentManagerForTest extends AssignmentManager { + + public AssignmentManagerForTest(MasterServices master) { + super(master); + } + + @Override + public List getRegionsOnServer(ServerName serverName) { + List regions = super.getRegionsOnServer(serverName); + if (ARRIVE_GET_REGIONS != null) { + ARRIVE_GET_REGIONS.countDown(); + try { + RESUME_GET_REGIONS.await(); + } catch (InterruptedException e) { + } + } + return regions; + } + + @Override + public ReportRegionStateTransitionResponse reportRegionStateTransition( + ReportRegionStateTransitionRequest req) throws PleaseHoldException { + if (ARRIVE_REPORT != null && req.getTransitionList().stream() + .allMatch(t -> !ProtobufUtil.toRegionInfo(t.getRegionInfo(0)).isMetaRegion())) { + ARRIVE_REPORT.countDown(); + try { + RESUME_REPORT.await(); + } catch (InterruptedException e) { + } + } + return super.reportRegionStateTransition(req); + } + } + + public static final class HMasterForTest extends HMaster { + + public HMasterForTest(Configuration conf) throws IOException, KeeperException { + super(conf); + } + + @Override + protected AssignmentManager createAssignmentManager(MasterServices master) { + return new AssignmentManagerForTest(master); + } + + @Override + protected ServerManager createServerManager(MasterServices master) throws IOException { + setupClusterConnection(); + return new ServerManagerForTest(master); + } + } + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static TableName NAME = TableName.valueOf("Report"); + + private static byte[] CF = Bytes.toBytes("cf"); + + @BeforeClass + public static void setUp() throws Exception { + UTIL.getConfiguration().setClass(HConstants.MASTER_IMPL, HMasterForTest.class, HMaster.class); + UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 1000); + UTIL.startMiniCluster(3); + UTIL.getAdmin().balancerSwitch(false, true); + UTIL.createTable(NAME, CF); + UTIL.waitTableAvailable(NAME); + } + + @AfterClass + public static void tearDown() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @Test + public void test() throws HBaseIOException, InterruptedException, ExecutionException { + RegionInfo region = UTIL.getMiniHBaseCluster().getRegions(NAME).get(0).getRegionInfo(); + AssignmentManager am = UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager(); + RegionStateNode rsn = am.getRegionStates().getRegionStateNode(region); + + // move from rs0 to rs1, and then kill rs0. Later add rs1 to exclude servers, and at last verify + // that the region should not be on rs1 and rs2 both. + HRegionServer rs0 = UTIL.getMiniHBaseCluster().getRegionServer(rsn.getRegionLocation()); + HRegionServer rs1 = UTIL.getOtherRegionServer(rs0); + HRegionServer rs2 = UTIL.getMiniHBaseCluster().getRegionServerThreads().stream() + .map(t -> t.getRegionServer()).filter(rs -> rs != rs0 && rs != rs1).findAny().get(); + + RESUME_REPORT = new CountDownLatch(1); + ARRIVE_REPORT = new CountDownLatch(1); + Future future = + am.moveAsync(new RegionPlan(region, rs0.getServerName(), rs1.getServerName())); + ARRIVE_REPORT.await(); + + RESUME_GET_REGIONS = new CountDownLatch(1); + ARRIVE_GET_REGIONS = new CountDownLatch(1); + rs0.abort("For testing!"); + + ARRIVE_GET_REGIONS.await(); + RESUME_REPORT.countDown(); + + try { + future.get(15, TimeUnit.SECONDS); + } catch (TimeoutException e) { + // after the fix in HBASE-21508 we will get this exception as the TRSP can not be finished any + // more before SCP interrupts it. It's OK. + } + + EXCLUDE_SERVERS.add(rs1.getServerName()); + RESUME_GET_REGIONS.countDown(); + // wait until there are no running procedures, no SCP and no TRSP + UTIL.waitFor(30000, () -> UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor() + .getActiveProcIds().isEmpty()); + boolean onRS1 = !rs1.getRegions(NAME).isEmpty(); + boolean onRS2 = !rs2.getRegions(NAME).isEmpty(); + assertNotEquals( + "should either be on rs1 or rs2, but onRS1 is " + onRS1 + " and on RS2 is " + onRS2, onRS1, + onRS2); + } +}