HBASE-20634 Reopen region while server crash can cause the procedure to be stuck

A reattempt at fixing HBASE-20173 [AMv2] DisableTableProcedure concurrent to ServerCrashProcedure can deadlock

The scenario is a SCP after processing WALs, goes to assign regions that
were on the crashed server but a concurrent Procedure gets in there
first and tries to unassign a region that was on the crashed server
(could be part of a move procedure or a disable table, etc.). The
unassign happens to run AFTER SCP has released all RPCs that
were going against the crashed server. The unassign fails because the
server is crashed. The unassign used to suspend itself only it would
never be woken up because the server it was going against had already
been processed. Worse, the SCP could not make progress because the
unassign was suspended with the lock on a region that it wanted to
assign held making it so it could make no progress.

In here, we add to the unassign recognition of the state where it is
running post SCP cleanup of RPCs. If present, unassign moves to finish
instead of suspending itself.

Includes a nice unit test made by Duo Zhang that reproduces nicely the
hung scenario.

M hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/FailedRemoteDispatchException.java
 Moved this class back to hbase-procedure where it belongs.

M hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/NoNodeDispatchException.java
M hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/NoServerDispatchException.java
M hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/NullTargetServerDispatchException.java
 Specializiations on FRDE so we can be more particular when we say there
 was a problem.

M hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java
 Change addOperationToNode so we throw exceptions that give more detail
 on issue rather than a mysterious true/false

M hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
 Undo SERVER_CRASH_HANDLE_RIT2. Bad idea (from HBASE-20173)

M hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
 Have expireServer return true if it actually queued an expiration. Used
 later in this patch.

M hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
 Hide methods that shouldn't be public. Add a particular check used out
 in unassign procedure failure processing.

M hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MoveRegionProcedure.java
 Check that server we're to move from is actually online (might
 catch a few silly move requests early).

M hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java
 Add doc on ServerState. Wasn't being used really. Now we actually stamp
 a Server OFFLINE after its WAL has been split. Means its safe to assign
 since all WALs have been processed. Add methods to update SPLITTING
 and to set it to OFFLINE after splitting done.

M hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java
 Change logging to be new-style and less repetitive of info.
 Cater to new way in which .addOperationToNode returns info (exceptions
 rather than true/false).

M hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/UnassignProcedure.java
 Add looking for the case where we failed assign AND we should not
 suspend because we will never be woken up because SCP is beyond
 doing this for all stuck RPCs.

 Some cleanup of the failure processing grouping where we can proceed.

 TODOs have been handled in this refactor including the TODO that
 wonders if it possible that there are concurrent fails coming in
 (Yes).

M hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
 Doc and removing the old HBASE-20173 'fix'.
 Also updating ServerStateNode post WAL splitting so it gets marked
 OFFLINE.

A hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestServerCrashProcedureStuck.java
 Nice test by Duo Zhang.

Signed-off-by: Umesh Agashe <uagashe@cloudera.com>
Signed-off-by: Duo Zhang <palomino219@gmail.com>
Signed-off-by: Mike Drob <mdrob@apache.org>
This commit is contained in:
zhangduo 2018-05-27 20:42:21 +08:00 committed by Michael Stack
parent 08446916a0
commit a472f24d17
15 changed files with 484 additions and 98 deletions

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.assignment;
package org.apache.hadoop.hbase.procedure2;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.yetus.audience.InterfaceAudience;

View File

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Used internally signaling failed queue of a remote procedure operation.
* In particular, no dispatch Node was found for the passed server name
* key AFTER queuing dispatch.
*/
@SuppressWarnings("serial")
@InterfaceAudience.Private
public class NoNodeDispatchException extends FailedRemoteDispatchException {
public NoNodeDispatchException(String msg) {
super(msg);
}
}

View File

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Used internally signaling failed queue of a remote procedure operation.
* In particular, no dispatch Node was found for the passed server name
* key.
*/
@SuppressWarnings("serial")
@InterfaceAudience.Private
public class NoServerDispatchException extends FailedRemoteDispatchException {
public NoServerDispatchException(String msg) {
super(msg);
}
}

View File

@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Used internally signaling failed queue of a remote procedure operation.
* The target server passed is null.
*/
@SuppressWarnings("serial")
@InterfaceAudience.Private
public class NullTargetServerDispatchException extends FailedRemoteDispatchException {
public NullTargetServerDispatchException(String msg) {
super(msg);
}
}

View File

@ -162,24 +162,25 @@ public abstract class RemoteProcedureDispatcher<TEnv, TRemote extends Comparable
}
/**
* Add a remote rpc. Be sure to check result for successful add.
* Add a remote rpc.
* @param key the node identifier
* @return True if we successfully added the operation.
*/
public boolean addOperationToNode(final TRemote key, RemoteProcedure rp) {
public void addOperationToNode(final TRemote key, RemoteProcedure rp)
throws NullTargetServerDispatchException, NoServerDispatchException, NoNodeDispatchException {
if (key == null) {
// Key is remote server name. Be careful. It could have been nulled by a concurrent
// ServerCrashProcedure shutting down outstanding RPC requests. See remoteCallFailed.
return false;
throw new NullTargetServerDispatchException(rp.toString());
}
assert key != null : "found null key for node";
BufferNode node = nodeMap.get(key);
if (node == null) {
return false;
// If null here, it means node has been removed because it crashed. This happens when server
// is expired in ServerManager. ServerCrashProcedure may or may not have run.
throw new NoServerDispatchException(key.toString() + "; " + rp.toString());
}
node.add(rp);
// Check our node still in the map; could have been removed by #removeNode.
return nodeMap.containsValue(node);
if (!nodeMap.containsValue(node)) {
throw new NoNodeDispatchException(key.toString() + "; " + rp.toString());
}
}
/**

View File

@ -557,29 +557,34 @@ public class ServerManager {
/*
* Expire the passed server. Add it to list of dead servers and queue a
* shutdown processing.
* @return True if we queued a ServerCrashProcedure else false if we did not (could happen
* for many reasons including the fact that its this server that is going down or we already
* have queued an SCP for this server or SCP processing is currently disabled because we are
* in startup phase).
*/
public synchronized void expireServer(final ServerName serverName) {
public synchronized boolean expireServer(final ServerName serverName) {
// THIS server is going down... can't handle our own expiration.
if (serverName.equals(master.getServerName())) {
if (!(master.isAborted() || master.isStopped())) {
master.stop("We lost our znode?");
}
return;
return false;
}
// No SCP handling during startup.
if (!master.isServerCrashProcessingEnabled()) {
LOG.info("Master doesn't enable ServerShutdownHandler during initialization, "
+ "delay expiring server " + serverName);
// Even we delay expire this server, we still need to handle Meta's RIT
// Even though we delay expire of this server, we still need to handle Meta's RIT
// that are against the crashed server; since when we do RecoverMetaProcedure,
// the SCP is not enable yet and Meta's RIT may be suspend forever. See HBase-19287
// the SCP is not enabled yet and Meta's RIT may be suspend forever. See HBase-19287
master.getAssignmentManager().handleMetaRITOnCrashedServer(serverName);
this.queuedDeadServers.add(serverName);
return;
// Return true because though on SCP queued, there will be one queued later.
return true;
}
if (this.deadservers.isDeadServer(serverName)) {
// TODO: Can this happen? It shouldn't be online in this case?
LOG.warn("Expiration of " + serverName +
" but server shutdown already in progress");
return;
LOG.warn("Expiration called on {} but crash processing already in progress", serverName);
return false;
}
moveFromOnlineToDeadServers(serverName);
@ -591,7 +596,7 @@ public class ServerManager {
if (this.onlineServers.isEmpty()) {
master.stop("Cluster shutdown set; onlineServer=0");
}
return;
return false;
}
LOG.info("Processing expiration of " + serverName + " on " + this.master.getServerName());
master.getAssignmentManager().submitServerCrash(serverName, true);
@ -602,6 +607,7 @@ public class ServerManager {
listener.serverRemoved(serverName);
}
}
return true;
}
@VisibleForTesting

View File

@ -1064,6 +1064,9 @@ public class AssignmentManager implements ServerListener {
protected boolean waitServerReportEvent(final ServerName serverName, final Procedure proc) {
final ServerStateNode serverNode = regionStates.getOrCreateServer(serverName);
if (serverNode == null) {
LOG.warn("serverName=null; {}", proc);
}
return serverNode.getReportEvent().suspendIfNotReady(proc);
}
@ -1903,19 +1906,35 @@ public class AssignmentManager implements ServerListener {
return node != null ? node.getVersionNumber() : 0;
}
public void killRegionServer(final ServerName serverName) {
private void killRegionServer(final ServerName serverName) {
final ServerStateNode serverNode = regionStates.getServerNode(serverName);
killRegionServer(serverNode);
}
public void killRegionServer(final ServerStateNode serverNode) {
/** Don't do this. Messes up accounting. Let ServerCrashProcedure do this.
for (RegionStateNode regionNode: serverNode.getRegions()) {
regionNode.offline();
}*/
private void killRegionServer(final ServerStateNode serverNode) {
master.getServerManager().expireServer(serverNode.getServerName());
}
/**
* This is a very particular check. The {@link org.apache.hadoop.hbase.master.ServerManager} is
* where you go to check on state of 'Servers', what Servers are online, etc. Here we are
* checking the state of a server that is post expiration, a ServerManager function that moves a
* server from online to dead. Here we are seeing if the server has moved beyond a particular
* point in the recovery process such that it is safe to move on with assigns; etc.
* @return True if this Server does not exist or if does and it is marked as OFFLINE (which
* happens after all WALs have been split on this server making it so assigns, etc. can
* proceed). If null, presumes the ServerStateNode was cleaned up by SCP.
*/
boolean isDeadServerProcessed(final ServerName serverName) {
ServerStateNode ssn = this.regionStates.getServerNode(serverName);
if (ssn == null) {
return true;
}
synchronized (ssn) {
return ssn.isOffline();
}
}
/**
* Handle RIT of meta region against crashed server.
* Only used when ServerCrashProcedure is not enabled.

View File

@ -79,6 +79,9 @@ public class MoveRegionProcedure extends AbstractStateMachineRegionProcedure<Mov
try {
preflightChecks(env, true);
checkOnline(env, this.plan.getRegionInfo());
if (!env.getMasterServices().getServerManager().isServerOnline(this.plan.getSource())) {
throw new HBaseIOException(this.plan.getSource() + " not online");
}
} catch (HBaseIOException e) {
LOG.warn(this.toString() + " FAILED because " + e.toString());
return Flow.NO_MORE_STATE;

View File

@ -308,7 +308,29 @@ public class RegionStates {
}
}
public enum ServerState { ONLINE, SPLITTING, OFFLINE }
/**
* Server State.
*/
public enum ServerState {
/**
* Initial state. Available.
*/
ONLINE,
/**
* Server expired/crashed. Currently undergoing WAL splitting.
*/
SPLITTING,
/**
* WAL splitting done.
*/
OFFLINE
}
/**
* State of Server; list of hosted regions, etc.
*/
public static class ServerStateNode implements Comparable<ServerStateNode> {
private final ServerReportEvent reportEvent;
@ -340,6 +362,10 @@ public class RegionStates {
return reportEvent;
}
public boolean isOffline() {
return this.state.equals(ServerState.OFFLINE);
}
public boolean isInState(final ServerState... expected) {
boolean expectedState = false;
if (expected != null) {
@ -597,17 +623,26 @@ public class RegionStates {
// ============================================================================================
// TODO: split helpers
// ============================================================================================
public void logSplit(final ServerName serverName) {
/**
* Call this when we start log splitting a crashed Server.
* @see #logSplit(ServerName)
*/
public void logSplitting(final ServerName serverName) {
final ServerStateNode serverNode = getOrCreateServer(serverName);
synchronized (serverNode) {
serverNode.setState(ServerState.SPLITTING);
/* THIS HAS TO BE WRONG. THIS IS SPLITTING OF REGION, NOT SPLITTING WALs.
for (RegionStateNode regionNode: serverNode.getRegions()) {
synchronized (regionNode) {
// TODO: Abort procedure if present
regionNode.setState(State.SPLITTING);
}
}*/
}
}
/**
* Called after we've split all logs on a crashed Server.
* @see #logSplitting(ServerName)
*/
public void logSplit(final ServerName serverName) {
final ServerStateNode serverNode = getOrCreateServer(serverName);
synchronized (serverNode) {
serverNode.setState(ServerState.OFFLINE);
}
}
@ -930,6 +965,12 @@ public class RegionStates {
// ==========================================================================
// Servers
// ==========================================================================
/**
* Be judicious calling this method. Do it on server register ONLY otherwise
* you could mess up online server accounting. TOOD: Review usage and convert
* to {@link #getServerNode(ServerName)} where we can.
*/
public ServerStateNode getOrCreateServer(final ServerName serverName) {
ServerStateNode node = serverMap.get(serverName);
if (node == null) {

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface;
import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
@ -215,10 +216,8 @@ public abstract class RegionTransitionProcedure
public void remoteCallFailed(final MasterProcedureEnv env,
final ServerName serverName, final IOException exception) {
final RegionStateNode regionNode = getRegionState(env);
String msg = exception.getMessage() == null? exception.getClass().getSimpleName():
exception.getMessage();
LOG.warn("Remote call failed " + this + "; " + regionNode.toShortString() +
"; exception=" + msg);
LOG.warn("Remote call failed {}; {}; {}; exception={}", serverName,
this, regionNode.toShortString(), exception.getClass().getSimpleName());
if (remoteCallFailed(env, regionNode, exception)) {
// NOTE: This call to wakeEvent puts this Procedure back on the scheduler.
// Thereafter, another Worker can be in here so DO NOT MESS WITH STATE beyond
@ -239,11 +238,7 @@ public abstract class RegionTransitionProcedure
*/
protected boolean addToRemoteDispatcher(final MasterProcedureEnv env,
final ServerName targetServer) {
assert targetServer == null || targetServer.equals(getRegionState(env).getRegionLocation()):
"targetServer=" + targetServer + " getRegionLocation=" +
getRegionState(env).getRegionLocation(); // TODO
LOG.info("Dispatch " + this + "; " + getRegionState(env).toShortString());
LOG.info("Dispatch {}; {}", this, getRegionState(env).toShortString());
// Put this procedure into suspended mode to wait on report of state change
// from remote regionserver. Means Procedure associated ProcedureEvent is marked not 'ready'.
@ -253,9 +248,10 @@ public abstract class RegionTransitionProcedure
// backtrack on stuff like the 'suspend' done above -- tricky as the 'wake' requests us -- and
// ditto up in the caller; it needs to undo state changes. Inside in remoteCallFailed, it does
// wake to undo the above suspend.
if (!env.getRemoteDispatcher().addOperationToNode(targetServer, this)) {
remoteCallFailed(env, targetServer,
new FailedRemoteDispatchException(this + " to " + targetServer));
try {
env.getRemoteDispatcher().addOperationToNode(targetServer, this);
} catch (FailedRemoteDispatchException frde) {
remoteCallFailed(env, targetServer, frde);
return false;
}
return true;

View File

@ -199,10 +199,11 @@ public class UnassignProcedure extends RegionTransitionProcedure {
return false;
}
// Mark the region as CLOSING.
env.getAssignmentManager().markRegionAsClosing(regionNode);
// Add the close region operation the the server dispatch queue.
// Add the close region operation to the server dispatch queue.
if (!addToRemoteDispatcher(env, regionNode.getRegionLocation())) {
// If addToRemoteDispatcher fails, it calls the callback #remoteCallFailed.
}
@ -250,37 +251,76 @@ public class UnassignProcedure extends RegionTransitionProcedure {
}
}
/**
* Our remote call failed but there are a few states where it is safe to proceed with the
* unassign; e.g. if a server crash and it has had all of its WALs processed, then we can allow
* this unassign to go to completion.
* @return True if it is safe to proceed with the unassign.
*/
private boolean isSafeToProceed(final MasterProcedureEnv env, final RegionStateNode regionNode,
final IOException exception) {
if (exception instanceof ServerCrashException) {
// This exception comes from ServerCrashProcedure AFTER log splitting. Its a signaling
// exception. SCP found this region as a RIT during its processing of the crash. Its call
// into here says it is ok to let this procedure go complete.
return true;
}
if (exception instanceof NotServingRegionException) {
LOG.warn("IS OK? ANY LOGS TO REPLAY; ACTING AS THOUGH ALL GOOD {}", regionNode, exception);
return true;
}
return false;
}
/**
* Set it up so when procedure is unsuspended, we'll move to the procedure finish.
*/
protected void proceed(final MasterProcedureEnv env, final RegionStateNode regionNode) {
try {
reportTransition(env, regionNode, TransitionCode.CLOSED, HConstants.NO_SEQNUM);
} catch (UnexpectedStateException e) {
// Should never happen.
throw new RuntimeException(e);
}
}
/**
* @return If true, we will re-wake up this procedure; if false, the procedure stays suspended.
*/
@Override
protected boolean remoteCallFailed(final MasterProcedureEnv env, final RegionStateNode regionNode,
final IOException exception) {
// TODO: Is there on-going rpc to cleanup?
if (exception instanceof ServerCrashException) {
// This exception comes from ServerCrashProcedure AFTER log splitting.
// SCP found this region as a RIT. Its call into here says it is ok to let this procedure go
// complete. This complete will release lock on this region so subsequent action on region
// can succeed; e.g. the assign that follows this unassign when a move (w/o wait on SCP
// the assign could run w/o logs being split so data loss).
try {
reportTransition(env, regionNode, TransitionCode.CLOSED, HConstants.NO_SEQNUM);
} catch (UnexpectedStateException e) {
// Should never happen.
throw new RuntimeException(e);
}
// Be careful reading the below; we do returns in middle of the method a few times.
if (isSafeToProceed(env, regionNode, exception)) {
proceed(env, regionNode);
} else if (exception instanceof RegionServerAbortedException ||
exception instanceof RegionServerStoppedException ||
exception instanceof ServerNotRunningYetException) {
// RS is aborting, we cannot offline the region since the region may need to do WAL
// recovery. Until we see the RS expiration, we should retry.
// TODO: This should be suspend like the below where we call expire on server?
exception instanceof RegionServerStoppedException) {
// RS is aborting/stopping, we cannot offline the region since the region may need to do WAL
// recovery. Until we see the RS expiration, stay suspended; return false.
LOG.info("Ignoring; waiting on ServerCrashProcedure", exception);
} else if (exception instanceof NotServingRegionException) {
LOG.info("IS THIS OK? ANY LOGS TO REPLAY; ACTING AS THOUGH ALL GOOD " + regionNode,
exception);
setTransitionState(RegionTransitionState.REGION_TRANSITION_FINISH);
return false;
} else if (exception instanceof ServerNotRunningYetException) {
// This should not happen. If it does, procedure will be woken-up and we'll retry.
// TODO: Needs a pause and backoff?
LOG.info("Retry", exception);
} else {
LOG.warn("Expiring server " + this + "; " + regionNode.toShortString() +
", exception=" + exception);
env.getMasterServices().getServerManager().expireServer(regionNode.getRegionLocation());
// We failed to RPC this server. Set it as expired.
ServerName serverName = regionNode.getRegionLocation();
LOG.warn("Expiring {}, {} {}; exception={}", serverName, this, regionNode.toShortString(),
exception.getClass().getSimpleName());
if (!env.getMasterServices().getServerManager().expireServer(serverName)) {
// Failed to queue an expire. Lots of possible reasons including it may be already expired.
// If so, is it beyond the state where we will be woken-up if go ahead and suspend the
// procedure. Look for this rare condition.
if (env.getAssignmentManager().isDeadServerProcessed(serverName)) {
// Its ok to proceed with this unassign.
LOG.info("{} is dead and processed; moving procedure to finished state; {}",
serverName, this);
proceed(env, regionNode);
// Return true; wake up the procedure so we can act on proceed.
return true;
}
}
// Return false so this procedure stays in suspended state. It will be woken up by the
// ServerCrashProcedure that was scheduled when we called #expireServer above. SCP calls
// #handleRIT which will call this method only the exception will be a ServerCrashException

View File

@ -163,7 +163,11 @@ implements ServerProcedureInterface {
"; cycles=" + getCycles());
}
// Handle RIT against crashed server. Will cancel any ongoing assigns/unassigns.
// Returns list of regions we need to reassign.
// Returns list of regions we need to reassign. NOTE: there is nothing to stop a
// dispatch happening AFTER this point. Check for the condition if a dispatch RPC fails
// inside in AssignProcedure/UnassignProcedure. AssignProcedure just keeps retrying.
// UnassignProcedure is more complicated. See where it does the check by calling
// am#isDeadServerProcessed.
List<RegionInfo> toAssign = handleRIT(env, regionsOnCrashedServer);
AssignmentManager am = env.getAssignmentManager();
// CreateAssignProcedure will try to use the old location for the region deploy.
@ -175,9 +179,8 @@ implements ServerProcedureInterface {
break;
case SERVER_CRASH_HANDLE_RIT2:
// Run the handleRIT again for case where another procedure managed to grab the lock on
// a region ahead of this crash handling procedure. Can happen in rare case. See
handleRIT(env, regionsOnCrashedServer);
// Noop. Left in place because we used to call handleRIT here for a second time
// but no longer necessary since HBASE-20634.
setNextState(ServerCrashState.SERVER_CRASH_FINISH);
break;
@ -232,11 +235,10 @@ implements ServerProcedureInterface {
AssignmentManager am = env.getMasterServices().getAssignmentManager();
// TODO: For Matteo. Below BLOCKs!!!! Redo so can relinquish executor while it is running.
// PROBLEM!!! WE BLOCK HERE.
am.getRegionStates().logSplitting(this.serverName);
mwm.splitLog(this.serverName);
if (LOG.isDebugEnabled()) {
LOG.debug("Done splitting WALs " + this);
}
am.getRegionStates().logSplit(this.serverName);
LOG.debug("Done splitting WALs {}", this);
}
@Override

View File

@ -0,0 +1,164 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AsyncAdmin;
import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.master.assignment.AssignProcedure;
import org.apache.hadoop.hbase.master.procedure.AbstractStateMachineRegionProcedure;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* Testcase for HBASE-20634
*/
@Category({ MasterTests.class, LargeTests.class })
public class TestServerCrashProcedureStuck {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestServerCrashProcedureStuck.class);
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static TableName TABLE_NAME = TableName.valueOf("test");
private static byte[] CF = Bytes.toBytes("cf");
private static CountDownLatch ARRIVE = new CountDownLatch(1);
private static CountDownLatch RESUME = new CountDownLatch(1);
public enum DummyState {
STATE
}
public static final class DummyRegionProcedure
extends AbstractStateMachineRegionProcedure<DummyState> {
public DummyRegionProcedure() {
}
public DummyRegionProcedure(MasterProcedureEnv env, RegionInfo hri) {
super(env, hri);
}
@Override
public TableOperationType getTableOperationType() {
return TableOperationType.REGION_EDIT;
}
@Override
protected Flow executeFromState(MasterProcedureEnv env, DummyState state)
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
ARRIVE.countDown();
RESUME.await();
return Flow.NO_MORE_STATE;
}
@Override
protected void rollbackState(MasterProcedureEnv env, DummyState state)
throws IOException, InterruptedException {
}
@Override
protected DummyState getState(int stateId) {
return DummyState.STATE;
}
@Override
protected int getStateId(DummyState state) {
return 0;
}
@Override
protected DummyState getInitialState() {
return DummyState.STATE;
}
}
@BeforeClass
public static void setUp() throws Exception {
UTIL.startMiniCluster(3);
UTIL.getAdmin().balancerSwitch(false, true);
UTIL.createTable(TABLE_NAME, CF);
UTIL.waitTableAvailable(TABLE_NAME);
}
@AfterClass
public static void tearDown() throws Exception {
UTIL.shutdownMiniCluster();
}
@Test
public void test()
throws IOException, InterruptedException, ExecutionException, TimeoutException {
RegionServerThread rsThread = null;
for (RegionServerThread t : UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
if (!t.getRegionServer().getRegions(TABLE_NAME).isEmpty()) {
rsThread = t;
break;
}
}
HRegionServer rs = rsThread.getRegionServer();
RegionInfo hri = rs.getRegions(TABLE_NAME).get(0).getRegionInfo();
HMaster master = UTIL.getMiniHBaseCluster().getMaster();
ProcedureExecutor<MasterProcedureEnv> executor = master.getMasterProcedureExecutor();
DummyRegionProcedure proc = new DummyRegionProcedure(executor.getEnvironment(), hri);
long procId = master.getMasterProcedureExecutor().submitProcedure(proc);
ARRIVE.await();
try (AsyncConnection conn =
ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get()) {
AsyncAdmin admin = conn.getAdmin();
CompletableFuture<Void> future = admin.move(hri.getRegionName());
rs.abort("For testing!");
UTIL.waitFor(30000,
() -> executor.getProcedures().stream().filter(p -> p instanceof AssignProcedure)
.map(p -> (AssignProcedure) p)
.anyMatch(p -> Bytes.equals(hri.getRegionName(), p.getRegionInfo().getRegionName())));
RESUME.countDown();
UTIL.waitFor(30000, () -> executor.isFinished(procId));
// see whether the move region procedure can finish properly
future.get(30, TimeUnit.SECONDS);
}
}
}

View File

@ -719,22 +719,38 @@ public class TestAssignmentManager {
protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName)
throws IOException {
switch (this.invocations++) {
case 0: throw new NotServingRegionException("Fake");
case 1: throw new RegionServerAbortedException("Fake!");
case 2: throw new RegionServerStoppedException("Fake!");
case 3: throw new ServerNotRunningYetException("Fake!");
case 4:
LOG.info("Return null response from serverName=" + server + "; means STUCK...TODO timeout");
executor.schedule(new Runnable() {
@Override
public void run() {
LOG.info("Sending in CRASH of " + server);
doCrash(server);
}
}, 1, TimeUnit.SECONDS);
return null;
default:
return super.execCloseRegion(server, regionName);
case 0: throw new NotServingRegionException("Fake");
case 1:
executor.schedule(new Runnable() {
@Override
public void run() {
LOG.info("Sending in CRASH of " + server);
doCrash(server);
}
}, 1, TimeUnit.SECONDS);
throw new RegionServerAbortedException("Fake!");
case 2:
executor.schedule(new Runnable() {
@Override
public void run() {
LOG.info("Sending in CRASH of " + server);
doCrash(server);
}
}, 1, TimeUnit.SECONDS);
throw new RegionServerStoppedException("Fake!");
case 3: throw new ServerNotRunningYetException("Fake!");
case 4:
LOG.info("Returned null from serverName={}; means STUCK...TODO timeout", server);
executor.schedule(new Runnable() {
@Override
public void run() {
LOG.info("Sending in CRASH of " + server);
doCrash(server);
}
}, 1, TimeUnit.SECONDS);
return null;
default:
return super.execCloseRegion(server, regionName);
}
}
}