HBASE-18551 [AMv2] UnassignProcedure and crashed regionservers
If an unassign is unable to communicate with its target server, expire the server and then wait on a signal from ServerCrashProcedure before proceeding. The unassign has lock on the region so no one else can proceed till we complete. We prevent any subsequent assign from running until logs have been split for crashed server. In AssignProcedure, do not assign if table is DISABLING or DISABLED. M hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java Change remoteCallFailed so it returns boolean on whether implementor wants to stay suspended or not. M hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/UnassignProcedure.java Doc. Also, if we are unable to talk to remote server, expire it and then wait on SCP to wake us up after it has processed logs for failed server.
This commit is contained in:
parent
6114824b53
commit
2dd75d10f8
|
@ -315,7 +315,7 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
@Override
|
||||
public void setMaxProcId(long maxProcId) {
|
||||
assert lastProcId.get() < 0 : "expected only one call to setMaxProcId()";
|
||||
LOG.debug("Load maxProcId=" + maxProcId);
|
||||
LOG.debug("Load max pid=" + maxProcId);
|
||||
lastProcId.set(maxProcId);
|
||||
}
|
||||
|
||||
|
@ -727,7 +727,7 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
!(procedures.containsKey(oldProcId) || completed.containsKey(oldProcId)) &&
|
||||
nonceKeysToProcIdsMap.containsKey(nonceKey)) {
|
||||
if (traceEnabled) {
|
||||
LOG.trace("Waiting for procId=" + oldProcId.longValue() + " to be submitted");
|
||||
LOG.trace("Waiting for pid=" + oldProcId.longValue() + " to be submitted");
|
||||
}
|
||||
Threads.sleep(100);
|
||||
}
|
||||
|
@ -999,9 +999,9 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
public void removeResult(final long procId) {
|
||||
CompletedProcedureRetainer retainer = completed.get(procId);
|
||||
if (retainer == null) {
|
||||
assert !procedures.containsKey(procId) : "procId=" + procId + " is still running";
|
||||
assert !procedures.containsKey(procId) : "pid=" + procId + " is still running";
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("procId=" + procId + " already removed by the cleaner.");
|
||||
LOG.debug("pid=" + procId + " already removed by the cleaner.");
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
@ -1349,7 +1349,7 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
return LockState.LOCK_YIELD_WAIT;
|
||||
} catch (Throwable e) {
|
||||
// Catch NullPointerExceptions or similar errors...
|
||||
LOG.fatal("CODE-BUG: Uncaught runtime exception fo " + proc, e);
|
||||
LOG.fatal("CODE-BUG: Uncaught runtime exception for " + proc, e);
|
||||
}
|
||||
|
||||
// allows to kill the executor before something is stored to the wal.
|
||||
|
|
|
@ -1007,7 +1007,7 @@ public class MasterRpcServices extends RSRpcServices
|
|||
@Override
|
||||
public GetProcedureResultResponse getProcedureResult(RpcController controller,
|
||||
GetProcedureResultRequest request) throws ServiceException {
|
||||
LOG.debug("Checking to see if procedure is done procId=" + request.getProcId());
|
||||
LOG.debug("Checking to see if procedure is done pid=" + request.getProcId());
|
||||
try {
|
||||
master.checkInitialized();
|
||||
GetProcedureResultResponse.Builder builder = GetProcedureResultResponse.newBuilder();
|
||||
|
|
|
@ -240,7 +240,7 @@ public class TableNamespaceManager {
|
|||
// Sleep some
|
||||
Threads.sleep(10);
|
||||
}
|
||||
throw new TimeoutIOException("Procedure " + procId + " is still running");
|
||||
throw new TimeoutIOException("Procedure pid=" + procId + " is still running");
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.client.TableState;
|
|||
/**
|
||||
* This is a helper class used to manage table states.
|
||||
* States persisted in tableinfo and cached internally.
|
||||
* TODO: Cache state. Cut down on meta looksups.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class TableStateManager {
|
||||
|
|
|
@ -27,10 +27,13 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
|
||||
import org.apache.hadoop.hbase.client.TableState;
|
||||
import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
|
||||
import org.apache.hadoop.hbase.master.RegionState.State;
|
||||
import org.apache.hadoop.hbase.master.TableStateManager;
|
||||
import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.RegionOpenOperation;
|
||||
|
@ -150,6 +153,13 @@ public class AssignProcedure extends RegionTransitionProcedure {
|
|||
LOG.info("Assigned, not reassigning; " + this + "; " + regionNode.toShortString());
|
||||
return false;
|
||||
}
|
||||
// Don't assign if table is in disabling of disbled state.
|
||||
TableStateManager tsm = env.getMasterServices().getTableStateManager();
|
||||
TableName tn = regionNode.getRegionInfo().getTable();
|
||||
if (tsm.isTableState(tn, TableState.State.DISABLING, TableState.State.DISABLED)) {
|
||||
LOG.info("Table is state=" + tsm.getTableState(tn) + ", skipping " + this);
|
||||
return false;
|
||||
}
|
||||
// If the region is SPLIT, we can't assign it. But state might be CLOSED, rather than
|
||||
// SPLIT which is what a region gets set to when Unassigned as part of SPLIT. FIX.
|
||||
if (regionNode.isInState(State.SPLIT) ||
|
||||
|
@ -321,9 +331,10 @@ public class AssignProcedure extends RegionTransitionProcedure {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void remoteCallFailed(final MasterProcedureEnv env, final RegionStateNode regionNode,
|
||||
protected boolean remoteCallFailed(final MasterProcedureEnv env, final RegionStateNode regionNode,
|
||||
final IOException exception) {
|
||||
handleFailure(env, regionNode);
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -165,7 +165,13 @@ public abstract class RegionTransitionProcedure
|
|||
RegionStateNode regionNode, TransitionCode code, long seqId) throws UnexpectedStateException;
|
||||
|
||||
public abstract RemoteOperation remoteCallBuild(MasterProcedureEnv env, ServerName serverName);
|
||||
protected abstract void remoteCallFailed(MasterProcedureEnv env,
|
||||
|
||||
/**
|
||||
* @return True if processing of fail is complete; the procedure will be woken from its suspend
|
||||
* and we'll go back to running through procedure steps:
|
||||
* otherwise if false we leave the procedure in suspended state.
|
||||
*/
|
||||
protected abstract boolean remoteCallFailed(MasterProcedureEnv env,
|
||||
RegionStateNode regionNode, IOException exception);
|
||||
|
||||
@Override
|
||||
|
@ -181,13 +187,16 @@ public abstract class RegionTransitionProcedure
|
|||
assert serverName.equals(regionNode.getRegionLocation());
|
||||
String msg = exception.getMessage() == null? exception.getClass().getSimpleName():
|
||||
exception.getMessage();
|
||||
LOG.warn("Failed " + this + "; " + regionNode.toShortString() + "; exception=" + msg);
|
||||
remoteCallFailed(env, regionNode, exception);
|
||||
LOG.warn("Remote call failed " + this + "; " + regionNode.toShortString() +
|
||||
"; exception=" + msg);
|
||||
if (remoteCallFailed(env, regionNode, exception)) {
|
||||
// NOTE: This call to wakeEvent puts this Procedure back on the scheduler.
|
||||
// Thereafter, another Worker can be in here so DO NOT MESS WITH STATE beyond
|
||||
// this method. Just get out of this current processing quickly.
|
||||
env.getProcedureScheduler().wakeEvent(regionNode.getProcedureEvent());
|
||||
}
|
||||
// else leave the procedure in suspended state; it is waiting on another call to this callback
|
||||
}
|
||||
|
||||
/**
|
||||
* Be careful! At the end of this method, the procedure has either succeeded
|
||||
|
@ -210,9 +219,10 @@ public abstract class RegionTransitionProcedure
|
|||
// from remote regionserver. Means Procedure associated ProcedureEvent is marked not 'ready'.
|
||||
env.getProcedureScheduler().suspendEvent(getRegionState(env).getProcedureEvent());
|
||||
|
||||
// Tricky because this can fail. If it fails need to backtrack on stuff like
|
||||
// the 'suspend' done above -- tricky as the 'wake' requeues us -- and ditto
|
||||
// up in the caller; it needs to undo state changes.
|
||||
// Tricky because this the below call to addOperationToNode can fail. If it fails, we need to
|
||||
// backtrack on stuff like the 'suspend' done above -- tricky as the 'wake' requeues us -- and
|
||||
// ditto up in the caller; it needs to undo state changes. Inside in remoteCallFailed, it does
|
||||
// wake to undo the above suspend.
|
||||
if (!env.getRemoteDispatcher().addOperationToNode(targetServer, this)) {
|
||||
remoteCallFailed(env, targetServer,
|
||||
new FailedRemoteDispatchException(this + " to " + targetServer));
|
||||
|
|
|
@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.master.assignment;
|
|||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.net.ConnectException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -49,18 +50,28 @@ import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
|
|||
|
||||
|
||||
/**
|
||||
* Procedure that describe the unassignment of a single region.
|
||||
* There can only be one RegionTransitionProcedure per region running at the time,
|
||||
* since each procedure takes a lock on the region.
|
||||
* Procedure that describes the unassignment of a single region.
|
||||
* There can only be one RegionTransitionProcedure -- i.e. an assign or an unassign -- per region
|
||||
* running at a time, since each procedure takes a lock on the region.
|
||||
*
|
||||
* <p>The Unassign starts by placing a "close region" request in the Remote Dispatcher
|
||||
* queue, and the procedure will then go into a "waiting state".
|
||||
* queue, and the procedure will then go into a "waiting state" (suspend).
|
||||
* The Remote Dispatcher will batch the various requests for that server and
|
||||
* they will be sent to the RS for execution.
|
||||
* The RS will complete the open operation by calling master.reportRegionStateTransition().
|
||||
* The AM will intercept the transition report, and notify the procedure.
|
||||
* The procedure will finish the unassign by publishing its new state on meta
|
||||
* or it will retry the unassign.
|
||||
* The AM will intercept the transition report, and notify this procedure.
|
||||
* The procedure will wakeup and finish the unassign by publishing its new state on meta.
|
||||
* <p>If we are unable to contact the remote regionserver whether because of ConnectException
|
||||
* or socket timeout, we will call expire on the server we were trying to contact. We will remain
|
||||
* in suspended state waiting for a wake up from the ServerCrashProcedure that is processing the
|
||||
* failed server. The basic idea is that if we notice a crashed server, then we have a
|
||||
* responsibility; i.e. we should not let go of the region until we are sure the server that was
|
||||
* hosting has had its crash processed. If we let go of the region before then, an assign might
|
||||
* run before the logs have been split which would make for data loss.
|
||||
*
|
||||
* <p>TODO: Rather than this tricky coordination between SCP and this Procedure, instead, work on
|
||||
* returning a SCP as our subprocedure; probably needs work on the framework to do this,
|
||||
* especially if the SCP already created.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class UnassignProcedure extends RegionTransitionProcedure {
|
||||
|
@ -75,8 +86,6 @@ public class UnassignProcedure extends RegionTransitionProcedure {
|
|||
*/
|
||||
protected volatile ServerName destinationServer;
|
||||
|
||||
protected final AtomicBoolean serverCrashed = new AtomicBoolean(false);
|
||||
|
||||
// TODO: should this be in a reassign procedure?
|
||||
// ...and keep unassign for 'disable' case?
|
||||
private boolean force;
|
||||
|
@ -161,15 +170,6 @@ public class UnassignProcedure extends RegionTransitionProcedure {
|
|||
return false;
|
||||
}
|
||||
|
||||
// if the server is down, mark the operation as failed. region cannot be unassigned
|
||||
// if server is down
|
||||
if (serverCrashed.get() || !isServerOnline(env, regionNode)) {
|
||||
LOG.warn("Server already down: " + this + "; " + regionNode.toShortString());
|
||||
setFailure("source region server not online",
|
||||
new ServerCrashException(getProcId(), regionNode.getRegionLocation()));
|
||||
return false;
|
||||
}
|
||||
|
||||
// if we haven't started the operation yet, we can abort
|
||||
if (aborted.get() && regionNode.isInState(State.OPEN)) {
|
||||
setAbortFailure(getClass().getSimpleName(), "abort requested");
|
||||
|
@ -181,12 +181,12 @@ public class UnassignProcedure extends RegionTransitionProcedure {
|
|||
|
||||
// Add the close region operation the the server dispatch queue.
|
||||
if (!addToRemoteDispatcher(env, regionNode.getRegionLocation())) {
|
||||
// If addToRemoteDispatcher fails, it calls #remoteCallFailed which
|
||||
// does all cleanup.
|
||||
// If addToRemoteDispatcher fails, it calls the callback #remoteCallFailed.
|
||||
}
|
||||
|
||||
// We always return true, even if we fail dispatch because addToRemoteDispatcher
|
||||
// failure processing sets state back to REGION_TRANSITION_QUEUE so we try again;
|
||||
// failure processing sets state back to REGION_TRANSITION_QUEUE so we come back in here again
|
||||
// to probably go into the clause after waitOnServerCrashProcedure;
|
||||
// i.e. return true to keep the Procedure running; it has been reset to startover.
|
||||
return true;
|
||||
}
|
||||
|
@ -218,13 +218,15 @@ public class UnassignProcedure extends RegionTransitionProcedure {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void remoteCallFailed(final MasterProcedureEnv env, final RegionStateNode regionNode,
|
||||
protected boolean remoteCallFailed(final MasterProcedureEnv env, final RegionStateNode regionNode,
|
||||
final IOException exception) {
|
||||
// TODO: Is there on-going rpc to cleanup?
|
||||
if (exception instanceof ServerCrashException) {
|
||||
// This exception comes from ServerCrashProcedure after log splitting.
|
||||
// It is ok to let this procedure go on to complete close now.
|
||||
// This will release lock on this region so the subsequent assign can succeed.
|
||||
// SCP found this region as a RIT. Its call into here says it is ok to let this procedure go
|
||||
// on to a complete close now. This will release lock on this region so subsequent action on
|
||||
// region can succeed; e.g. the assign that follows this unassign when a move (w/o wait on SCP
|
||||
// the assign could run w/o logs being split so data loss).
|
||||
try {
|
||||
reportTransition(env, regionNode, TransitionCode.CLOSED, HConstants.NO_SEQNUM);
|
||||
} catch (UnexpectedStateException e) {
|
||||
|
@ -237,19 +239,22 @@ public class UnassignProcedure extends RegionTransitionProcedure {
|
|||
// TODO
|
||||
// RS is aborting, we cannot offline the region since the region may need to do WAL
|
||||
// recovery. Until we see the RS expiration, we should retry.
|
||||
// TODO: This should be suspend like the below where we call expire on server?
|
||||
LOG.info("Ignoring; waiting on ServerCrashProcedure", exception);
|
||||
// serverCrashed.set(true);
|
||||
} else if (exception instanceof NotServingRegionException) {
|
||||
LOG.info("IS THIS OK? ANY LOGS TO REPLAY; ACTING AS THOUGH ALL GOOD " + regionNode, exception);
|
||||
LOG.info("IS THIS OK? ANY LOGS TO REPLAY; ACTING AS THOUGH ALL GOOD " + regionNode,
|
||||
exception);
|
||||
setTransitionState(RegionTransitionState.REGION_TRANSITION_FINISH);
|
||||
} else {
|
||||
// TODO: kill the server in case we get an exception we are not able to handle
|
||||
LOG.warn("Killing server; unexpected exception; " +
|
||||
this + "; " + regionNode.toShortString() +
|
||||
" exception=" + exception);
|
||||
LOG.warn("Expiring server " + this + "; " + regionNode.toShortString() +
|
||||
", exception=" + exception);
|
||||
env.getMasterServices().getServerManager().expireServer(regionNode.getRegionLocation());
|
||||
serverCrashed.set(true);
|
||||
// Return false so this procedure stays in suspended state. It will be woken up by a
|
||||
// ServerCrashProcedure when it notices this RIT.
|
||||
// TODO: Add a SCP as a new subprocedure that we now come to depend on.
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -118,7 +118,7 @@ public class DisableTableProcedure
|
|||
postDisable(env, state);
|
||||
return Flow.NO_MORE_STATE;
|
||||
default:
|
||||
throw new UnsupportedOperationException("unhandled state=" + state);
|
||||
throw new UnsupportedOperationException("Unhandled state=" + state);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
if (isRollbackSupported(state)) {
|
||||
|
@ -147,7 +147,7 @@ public class DisableTableProcedure
|
|||
}
|
||||
|
||||
// The delete doesn't have a rollback. The execution will succeed, at some point.
|
||||
throw new UnsupportedOperationException("unhandled state=" + state);
|
||||
throw new UnsupportedOperationException("Unhandled state=" + state);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -197,7 +197,7 @@ public class RSProcedureDispatcher
|
|||
}
|
||||
|
||||
// trying to send the request elsewhere instead
|
||||
LOG.warn(String.format("the request should be tried elsewhere instead; server=%s try=%d",
|
||||
LOG.warn(String.format("Failed dispatch to server=%s try=%d",
|
||||
serverName, numberOfAttemptsSoFar), e);
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -23,7 +23,8 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|||
|
||||
/**
|
||||
* Passed as Exception by {@link ServerCrashProcedure}
|
||||
* notifying on-going RIT that server has failed.
|
||||
* notifying on-going RIT that server has failed. This exception is less an error-condition than
|
||||
* it is a signal to waiting procedures that they can now proceed.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@SuppressWarnings("serial")
|
||||
|
|
|
@ -356,7 +356,8 @@ implements ServerProcedureInterface {
|
|||
* Handle any outstanding RIT that are up against this.serverName, the crashed server.
|
||||
* Notify them of crash. Remove assign entries from the passed in <code>regions</code>
|
||||
* otherwise we have two assigns going on and they will fight over who has lock.
|
||||
* Notify Unassigns also.
|
||||
* Notify Unassigns. If unable to unassign because server went away, unassigns block waiting
|
||||
* on the below callback from a ServerCrashProcedure before proceeding.
|
||||
* @param env
|
||||
* @param regions Regions that were on crashed server
|
||||
*/
|
||||
|
|
|
@ -403,12 +403,16 @@ public class TestSplitTransactionOnCluster {
|
|||
for (HRegion d: daughters) {
|
||||
LOG.info("Regions after crash: " + d);
|
||||
}
|
||||
if (daughters.size() != regions.size()) {
|
||||
LOG.info("Daughters=" + daughters.size() + ", regions=" + regions.size());
|
||||
}
|
||||
assertEquals(daughters.size(), regions.size());
|
||||
for (HRegion r: regions) {
|
||||
LOG.info("Regions post crash " + r);
|
||||
LOG.info("Regions post crash " + r + ", contains=" + daughters.contains(r));
|
||||
assertTrue("Missing region post crash " + r, daughters.contains(r));
|
||||
}
|
||||
} finally {
|
||||
LOG.info("EXITING");
|
||||
admin.setBalancerRunning(true, false);
|
||||
cluster.getMaster().setCatalogJanitorEnabled(true);
|
||||
t.close();
|
||||
|
@ -799,7 +803,7 @@ public class TestSplitTransactionOnCluster {
|
|||
throws IOException, InterruptedException {
|
||||
this.admin.splitRegion(hri.getRegionName());
|
||||
for (int i = 0; this.cluster.getRegions(hri.getTable()).size() <= regionCount && i < 60; i++) {
|
||||
LOG.debug("Waiting on region to split");
|
||||
LOG.debug("Waiting on region " + hri.getRegionNameAsString() + " to split");
|
||||
Thread.sleep(2000);
|
||||
}
|
||||
|
||||
|
@ -827,10 +831,12 @@ public class TestSplitTransactionOnCluster {
|
|||
// the table region serving server.
|
||||
int metaServerIndex = cluster.getServerWithMeta();
|
||||
assertTrue(metaServerIndex == -1); // meta is on master now
|
||||
HRegionServer metaRegionServer = cluster.getMaster();
|
||||
HRegionServer metaRegionServer = cluster.getRegionServer(metaServerIndex);
|
||||
int tableRegionIndex = cluster.getServerWith(hri.getRegionName());
|
||||
assertTrue(tableRegionIndex != -1);
|
||||
HRegionServer tableRegionServer = cluster.getRegionServer(tableRegionIndex);
|
||||
LOG.info("MetaRegionServer=" + metaRegionServer.getServerName() +
|
||||
", other=" + tableRegionServer.getServerName());
|
||||
if (metaRegionServer.getServerName().equals(tableRegionServer.getServerName())) {
|
||||
HRegionServer hrs = getOtherRegionServer(cluster, metaRegionServer);
|
||||
assertNotNull(hrs);
|
||||
|
@ -848,8 +854,8 @@ public class TestSplitTransactionOnCluster {
|
|||
tableRegionIndex + " and metaServerIndex=" + metaServerIndex);
|
||||
Thread.sleep(100);
|
||||
}
|
||||
assertTrue("Region not moved off hbase:meta server", tableRegionIndex != -1
|
||||
&& tableRegionIndex != metaServerIndex);
|
||||
assertTrue("Region not moved off hbase:meta server, tableRegionIndex=" + tableRegionIndex,
|
||||
tableRegionIndex != -1 && tableRegionIndex != metaServerIndex);
|
||||
// Verify for sure table region is not on same server as hbase:meta
|
||||
tableRegionIndex = cluster.getServerWith(hri.getRegionName());
|
||||
assertTrue(tableRegionIndex != -1);
|
||||
|
@ -899,7 +905,7 @@ public class TestSplitTransactionOnCluster {
|
|||
|
||||
private void awaitDaughters(TableName tableName, int numDaughters) throws InterruptedException {
|
||||
// Wait till regions are back on line again.
|
||||
for (int i=0; cluster.getRegions(tableName).size() < numDaughters && i<60; i++) {
|
||||
for (int i = 0; cluster.getRegions(tableName).size() < numDaughters && i < 60; i++) {
|
||||
LOG.info("Waiting for repair to happen");
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue