HBASE-18551 [AMv2] UnassignProcedure and crashed regionservers
If an unassign is unable to communicate with its target server, expire the server and then wait on a signal from ServerCrashProcedure before proceeding. The unassign has lock on the region so no one else can proceed till we complete. We prevent any subsequent assign from running until logs have been split for crashed server. In AssignProcedure, do not assign if table is DISABLING or DISABLED. M hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java Change remoteCallFailed so it returns boolean on whether implementor wants to stay suspended or not. M hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/UnassignProcedure.java Doc. Also, if we are unable to talk to remote server, expire it and then wait on SCP to wake us up after it has processed logs for failed server.
This commit is contained in:
parent
0c16bb591b
commit
5940f4224c
|
@ -315,7 +315,7 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
@Override
|
@Override
|
||||||
public void setMaxProcId(long maxProcId) {
|
public void setMaxProcId(long maxProcId) {
|
||||||
assert lastProcId.get() < 0 : "expected only one call to setMaxProcId()";
|
assert lastProcId.get() < 0 : "expected only one call to setMaxProcId()";
|
||||||
LOG.debug("Load maxProcId=" + maxProcId);
|
LOG.debug("Load max pid=" + maxProcId);
|
||||||
lastProcId.set(maxProcId);
|
lastProcId.set(maxProcId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -727,7 +727,7 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
!(procedures.containsKey(oldProcId) || completed.containsKey(oldProcId)) &&
|
!(procedures.containsKey(oldProcId) || completed.containsKey(oldProcId)) &&
|
||||||
nonceKeysToProcIdsMap.containsKey(nonceKey)) {
|
nonceKeysToProcIdsMap.containsKey(nonceKey)) {
|
||||||
if (traceEnabled) {
|
if (traceEnabled) {
|
||||||
LOG.trace("Waiting for procId=" + oldProcId.longValue() + " to be submitted");
|
LOG.trace("Waiting for pid=" + oldProcId.longValue() + " to be submitted");
|
||||||
}
|
}
|
||||||
Threads.sleep(100);
|
Threads.sleep(100);
|
||||||
}
|
}
|
||||||
|
@ -999,9 +999,9 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
public void removeResult(final long procId) {
|
public void removeResult(final long procId) {
|
||||||
CompletedProcedureRetainer retainer = completed.get(procId);
|
CompletedProcedureRetainer retainer = completed.get(procId);
|
||||||
if (retainer == null) {
|
if (retainer == null) {
|
||||||
assert !procedures.containsKey(procId) : "procId=" + procId + " is still running";
|
assert !procedures.containsKey(procId) : "pid=" + procId + " is still running";
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("procId=" + procId + " already removed by the cleaner.");
|
LOG.debug("pid=" + procId + " already removed by the cleaner.");
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -1349,7 +1349,7 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
return LockState.LOCK_YIELD_WAIT;
|
return LockState.LOCK_YIELD_WAIT;
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
// Catch NullPointerExceptions or similar errors...
|
// Catch NullPointerExceptions or similar errors...
|
||||||
LOG.fatal("CODE-BUG: Uncaught runtime exception fo " + proc, e);
|
LOG.fatal("CODE-BUG: Uncaught runtime exception for " + proc, e);
|
||||||
}
|
}
|
||||||
|
|
||||||
// allows to kill the executor before something is stored to the wal.
|
// allows to kill the executor before something is stored to the wal.
|
||||||
|
|
|
@ -1007,7 +1007,7 @@ public class MasterRpcServices extends RSRpcServices
|
||||||
@Override
|
@Override
|
||||||
public GetProcedureResultResponse getProcedureResult(RpcController controller,
|
public GetProcedureResultResponse getProcedureResult(RpcController controller,
|
||||||
GetProcedureResultRequest request) throws ServiceException {
|
GetProcedureResultRequest request) throws ServiceException {
|
||||||
LOG.debug("Checking to see if procedure is done procId=" + request.getProcId());
|
LOG.debug("Checking to see if procedure is done pid=" + request.getProcId());
|
||||||
try {
|
try {
|
||||||
master.checkInitialized();
|
master.checkInitialized();
|
||||||
GetProcedureResultResponse.Builder builder = GetProcedureResultResponse.newBuilder();
|
GetProcedureResultResponse.Builder builder = GetProcedureResultResponse.newBuilder();
|
||||||
|
|
|
@ -240,7 +240,7 @@ public class TableNamespaceManager {
|
||||||
// Sleep some
|
// Sleep some
|
||||||
Threads.sleep(10);
|
Threads.sleep(10);
|
||||||
}
|
}
|
||||||
throw new TimeoutIOException("Procedure " + procId + " is still running");
|
throw new TimeoutIOException("Procedure pid=" + procId + " is still running");
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.client.TableState;
|
||||||
/**
|
/**
|
||||||
* This is a helper class used to manage table states.
|
* This is a helper class used to manage table states.
|
||||||
* States persisted in tableinfo and cached internally.
|
* States persisted in tableinfo and cached internally.
|
||||||
|
* TODO: Cache state. Cut down on meta looksups.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class TableStateManager {
|
public class TableStateManager {
|
||||||
|
|
|
@ -27,10 +27,13 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
|
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
|
||||||
|
import org.apache.hadoop.hbase.client.TableState;
|
||||||
import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
|
import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
|
||||||
import org.apache.hadoop.hbase.master.RegionState.State;
|
import org.apache.hadoop.hbase.master.RegionState.State;
|
||||||
|
import org.apache.hadoop.hbase.master.TableStateManager;
|
||||||
import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode;
|
import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode;
|
||||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||||
import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.RegionOpenOperation;
|
import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.RegionOpenOperation;
|
||||||
|
@ -150,6 +153,13 @@ public class AssignProcedure extends RegionTransitionProcedure {
|
||||||
LOG.info("Assigned, not reassigning; " + this + "; " + regionNode.toShortString());
|
LOG.info("Assigned, not reassigning; " + this + "; " + regionNode.toShortString());
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
// Don't assign if table is in disabling of disabled state.
|
||||||
|
TableStateManager tsm = env.getMasterServices().getTableStateManager();
|
||||||
|
TableName tn = regionNode.getRegionInfo().getTable();
|
||||||
|
if (tsm.isTableState(tn, TableState.State.DISABLING, TableState.State.DISABLED)) {
|
||||||
|
LOG.info("Table " + tn + " state=" + tsm.getTableState(tn) + ", skipping " + this);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
// If the region is SPLIT, we can't assign it. But state might be CLOSED, rather than
|
// If the region is SPLIT, we can't assign it. But state might be CLOSED, rather than
|
||||||
// SPLIT which is what a region gets set to when Unassigned as part of SPLIT. FIX.
|
// SPLIT which is what a region gets set to when Unassigned as part of SPLIT. FIX.
|
||||||
if (regionNode.isInState(State.SPLIT) ||
|
if (regionNode.isInState(State.SPLIT) ||
|
||||||
|
@ -321,9 +331,10 @@ public class AssignProcedure extends RegionTransitionProcedure {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void remoteCallFailed(final MasterProcedureEnv env, final RegionStateNode regionNode,
|
protected boolean remoteCallFailed(final MasterProcedureEnv env, final RegionStateNode regionNode,
|
||||||
final IOException exception) {
|
final IOException exception) {
|
||||||
handleFailure(env, regionNode);
|
handleFailure(env, regionNode);
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto
|
||||||
* The AssignmentManager will notify this procedure when the RS completes
|
* The AssignmentManager will notify this procedure when the RS completes
|
||||||
* the operation and reports the transitioned state
|
* the operation and reports the transitioned state
|
||||||
* (see the Assign and Unassign class for more detail).
|
* (see the Assign and Unassign class for more detail).
|
||||||
|
*
|
||||||
* <p>Procedures move from the REGION_TRANSITION_QUEUE state when they are
|
* <p>Procedures move from the REGION_TRANSITION_QUEUE state when they are
|
||||||
* first submitted, to the REGION_TRANSITION_DISPATCH state when the request
|
* first submitted, to the REGION_TRANSITION_DISPATCH state when the request
|
||||||
* to remote server is sent and the Procedure is suspended waiting on external
|
* to remote server is sent and the Procedure is suspended waiting on external
|
||||||
|
@ -67,20 +68,15 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto
|
||||||
* assignment to a different target server by setting {@link AssignProcedure#forceNewPlan}. When
|
* assignment to a different target server by setting {@link AssignProcedure#forceNewPlan}. When
|
||||||
* the number of attempts reach hreshold configuration 'hbase.assignment.maximum.attempts',
|
* the number of attempts reach hreshold configuration 'hbase.assignment.maximum.attempts',
|
||||||
* the procedure is aborted. For {@link UnassignProcedure}, similar re-attempts are
|
* the procedure is aborted. For {@link UnassignProcedure}, similar re-attempts are
|
||||||
* intentionally not implemented. It is a 'one shot' procedure.
|
* intentionally not implemented. It is a 'one shot' procedure. See its class doc for how it
|
||||||
|
* handles failure.
|
||||||
* </li>
|
* </li>
|
||||||
* </ul>
|
* </ul>
|
||||||
*
|
*
|
||||||
* <p>TODO: Considering it is a priority doing all we can to get make a region available as soon as possible,
|
* <p>TODO: Considering it is a priority doing all we can to get make a region available as soon as possible,
|
||||||
* re-attempting with any target makes sense if specified target fails in case of
|
* re-attempting with any target makes sense if specified target fails in case of
|
||||||
* {@link AssignProcedure}. For {@link UnassignProcedure}, if communication with RS fails,
|
* {@link AssignProcedure}. For {@link UnassignProcedure}, our concern is preventing data loss
|
||||||
* similar re-attempt makes little sense (what should be different from previous attempt?). Also it
|
* on failed unassign. See class doc for explanation.
|
||||||
* could be complex with current implementation of
|
|
||||||
* {@link RegionTransitionProcedure#execute(MasterProcedureEnv)} and {@link UnassignProcedure}.
|
|
||||||
* We have made a choice of keeping {@link UnassignProcedure} simple, where the procedure either
|
|
||||||
* succeeds or fails depending on communication with RS. As parent will have broader context, parent
|
|
||||||
* can better handle the failed instance of {@link UnassignProcedure}. Similar simplicity for
|
|
||||||
* {@link AssignProcedure} is desired and should be explored/ discussed further.
|
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public abstract class RegionTransitionProcedure
|
public abstract class RegionTransitionProcedure
|
||||||
|
@ -165,7 +161,13 @@ public abstract class RegionTransitionProcedure
|
||||||
RegionStateNode regionNode, TransitionCode code, long seqId) throws UnexpectedStateException;
|
RegionStateNode regionNode, TransitionCode code, long seqId) throws UnexpectedStateException;
|
||||||
|
|
||||||
public abstract RemoteOperation remoteCallBuild(MasterProcedureEnv env, ServerName serverName);
|
public abstract RemoteOperation remoteCallBuild(MasterProcedureEnv env, ServerName serverName);
|
||||||
protected abstract void remoteCallFailed(MasterProcedureEnv env,
|
|
||||||
|
/**
|
||||||
|
* @return True if processing of fail is complete; the procedure will be woken from its suspend
|
||||||
|
* and we'll go back to running through procedure steps:
|
||||||
|
* otherwise if false we leave the procedure in suspended state.
|
||||||
|
*/
|
||||||
|
protected abstract boolean remoteCallFailed(MasterProcedureEnv env,
|
||||||
RegionStateNode regionNode, IOException exception);
|
RegionStateNode regionNode, IOException exception);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -181,13 +183,16 @@ public abstract class RegionTransitionProcedure
|
||||||
assert serverName.equals(regionNode.getRegionLocation());
|
assert serverName.equals(regionNode.getRegionLocation());
|
||||||
String msg = exception.getMessage() == null? exception.getClass().getSimpleName():
|
String msg = exception.getMessage() == null? exception.getClass().getSimpleName():
|
||||||
exception.getMessage();
|
exception.getMessage();
|
||||||
LOG.warn("Failed " + this + "; " + regionNode.toShortString() + "; exception=" + msg);
|
LOG.warn("Remote call failed " + this + "; " + regionNode.toShortString() +
|
||||||
remoteCallFailed(env, regionNode, exception);
|
"; exception=" + msg);
|
||||||
|
if (remoteCallFailed(env, regionNode, exception)) {
|
||||||
// NOTE: This call to wakeEvent puts this Procedure back on the scheduler.
|
// NOTE: This call to wakeEvent puts this Procedure back on the scheduler.
|
||||||
// Thereafter, another Worker can be in here so DO NOT MESS WITH STATE beyond
|
// Thereafter, another Worker can be in here so DO NOT MESS WITH STATE beyond
|
||||||
// this method. Just get out of this current processing quickly.
|
// this method. Just get out of this current processing quickly.
|
||||||
env.getProcedureScheduler().wakeEvent(regionNode.getProcedureEvent());
|
env.getProcedureScheduler().wakeEvent(regionNode.getProcedureEvent());
|
||||||
}
|
}
|
||||||
|
// else leave the procedure in suspended state; it is waiting on another call to this callback
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Be careful! At the end of this method, the procedure has either succeeded
|
* Be careful! At the end of this method, the procedure has either succeeded
|
||||||
|
@ -210,9 +215,10 @@ public abstract class RegionTransitionProcedure
|
||||||
// from remote regionserver. Means Procedure associated ProcedureEvent is marked not 'ready'.
|
// from remote regionserver. Means Procedure associated ProcedureEvent is marked not 'ready'.
|
||||||
env.getProcedureScheduler().suspendEvent(getRegionState(env).getProcedureEvent());
|
env.getProcedureScheduler().suspendEvent(getRegionState(env).getProcedureEvent());
|
||||||
|
|
||||||
// Tricky because this can fail. If it fails need to backtrack on stuff like
|
// Tricky because the below call to addOperationToNode can fail. If it fails, we need to
|
||||||
// the 'suspend' done above -- tricky as the 'wake' requeues us -- and ditto
|
// backtrack on stuff like the 'suspend' done above -- tricky as the 'wake' requeues us -- and
|
||||||
// up in the caller; it needs to undo state changes.
|
// ditto up in the caller; it needs to undo state changes. Inside in remoteCallFailed, it does
|
||||||
|
// wake to undo the above suspend.
|
||||||
if (!env.getRemoteDispatcher().addOperationToNode(targetServer, this)) {
|
if (!env.getRemoteDispatcher().addOperationToNode(targetServer, this)) {
|
||||||
remoteCallFailed(env, targetServer,
|
remoteCallFailed(env, targetServer,
|
||||||
new FailedRemoteDispatchException(this + " to " + targetServer));
|
new FailedRemoteDispatchException(this + " to " + targetServer));
|
||||||
|
|
|
@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.master.assignment;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
|
import java.net.ConnectException;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
@ -49,18 +50,28 @@ import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Procedure that describe the unassignment of a single region.
|
* Procedure that describes the unassignment of a single region.
|
||||||
* There can only be one RegionTransitionProcedure per region running at the time,
|
* There can only be one RegionTransitionProcedure -- i.e. an assign or an unassign -- per region
|
||||||
* since each procedure takes a lock on the region.
|
* running at a time, since each procedure takes a lock on the region.
|
||||||
*
|
*
|
||||||
* <p>The Unassign starts by placing a "close region" request in the Remote Dispatcher
|
* <p>The Unassign starts by placing a "close region" request in the Remote Dispatcher
|
||||||
* queue, and the procedure will then go into a "waiting state".
|
* queue, and the procedure will then go into a "waiting state" (suspend).
|
||||||
* The Remote Dispatcher will batch the various requests for that server and
|
* The Remote Dispatcher will batch the various requests for that server and
|
||||||
* they will be sent to the RS for execution.
|
* they will be sent to the RS for execution.
|
||||||
* The RS will complete the open operation by calling master.reportRegionStateTransition().
|
* The RS will complete the open operation by calling master.reportRegionStateTransition().
|
||||||
* The AM will intercept the transition report, and notify the procedure.
|
* The AM will intercept the transition report, and notify this procedure.
|
||||||
* The procedure will finish the unassign by publishing its new state on meta
|
* The procedure will wakeup and finish the unassign by publishing its new state on meta.
|
||||||
* or it will retry the unassign.
|
* <p>If we are unable to contact the remote regionserver whether because of ConnectException
|
||||||
|
* or socket timeout, we will call expire on the server we were trying to contact. We will remain
|
||||||
|
* in suspended state waiting for a wake up from the ServerCrashProcedure that is processing the
|
||||||
|
* failed server. The basic idea is that if we notice a crashed server, then we have a
|
||||||
|
* responsibility; i.e. we should not let go of the region until we are sure the server that was
|
||||||
|
* hosting has had its crash processed. If we let go of the region before then, an assign might
|
||||||
|
* run before the logs have been split which would make for data loss.
|
||||||
|
*
|
||||||
|
* <p>TODO: Rather than this tricky coordination between SCP and this Procedure, instead, work on
|
||||||
|
* returning a SCP as our subprocedure; probably needs work on the framework to do this,
|
||||||
|
* especially if the SCP already created.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class UnassignProcedure extends RegionTransitionProcedure {
|
public class UnassignProcedure extends RegionTransitionProcedure {
|
||||||
|
@ -75,8 +86,6 @@ public class UnassignProcedure extends RegionTransitionProcedure {
|
||||||
*/
|
*/
|
||||||
protected volatile ServerName destinationServer;
|
protected volatile ServerName destinationServer;
|
||||||
|
|
||||||
protected final AtomicBoolean serverCrashed = new AtomicBoolean(false);
|
|
||||||
|
|
||||||
// TODO: should this be in a reassign procedure?
|
// TODO: should this be in a reassign procedure?
|
||||||
// ...and keep unassign for 'disable' case?
|
// ...and keep unassign for 'disable' case?
|
||||||
private boolean force;
|
private boolean force;
|
||||||
|
@ -161,15 +170,6 @@ public class UnassignProcedure extends RegionTransitionProcedure {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
// if the server is down, mark the operation as failed. region cannot be unassigned
|
|
||||||
// if server is down
|
|
||||||
if (serverCrashed.get() || !isServerOnline(env, regionNode)) {
|
|
||||||
LOG.warn("Server already down: " + this + "; " + regionNode.toShortString());
|
|
||||||
setFailure("source region server not online",
|
|
||||||
new ServerCrashException(getProcId(), regionNode.getRegionLocation()));
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
// if we haven't started the operation yet, we can abort
|
// if we haven't started the operation yet, we can abort
|
||||||
if (aborted.get() && regionNode.isInState(State.OPEN)) {
|
if (aborted.get() && regionNode.isInState(State.OPEN)) {
|
||||||
setAbortFailure(getClass().getSimpleName(), "abort requested");
|
setAbortFailure(getClass().getSimpleName(), "abort requested");
|
||||||
|
@ -181,13 +181,10 @@ public class UnassignProcedure extends RegionTransitionProcedure {
|
||||||
|
|
||||||
// Add the close region operation the the server dispatch queue.
|
// Add the close region operation the the server dispatch queue.
|
||||||
if (!addToRemoteDispatcher(env, regionNode.getRegionLocation())) {
|
if (!addToRemoteDispatcher(env, regionNode.getRegionLocation())) {
|
||||||
// If addToRemoteDispatcher fails, it calls #remoteCallFailed which
|
// If addToRemoteDispatcher fails, it calls the callback #remoteCallFailed.
|
||||||
// does all cleanup.
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// We always return true, even if we fail dispatch because addToRemoteDispatcher
|
// Return true to keep the procedure running.
|
||||||
// failure processing sets state back to REGION_TRANSITION_QUEUE so we try again;
|
|
||||||
// i.e. return true to keep the Procedure running; it has been reset to startover.
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -218,13 +215,15 @@ public class UnassignProcedure extends RegionTransitionProcedure {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void remoteCallFailed(final MasterProcedureEnv env, final RegionStateNode regionNode,
|
protected boolean remoteCallFailed(final MasterProcedureEnv env, final RegionStateNode regionNode,
|
||||||
final IOException exception) {
|
final IOException exception) {
|
||||||
// TODO: Is there on-going rpc to cleanup?
|
// TODO: Is there on-going rpc to cleanup?
|
||||||
if (exception instanceof ServerCrashException) {
|
if (exception instanceof ServerCrashException) {
|
||||||
// This exception comes from ServerCrashProcedure after log splitting.
|
// This exception comes from ServerCrashProcedure after log splitting.
|
||||||
// It is ok to let this procedure go on to complete close now.
|
// SCP found this region as a RIT. Its call into here says it is ok to let this procedure go
|
||||||
// This will release lock on this region so the subsequent assign can succeed.
|
// on to a complete close now. This will release lock on this region so subsequent action on
|
||||||
|
// region can succeed; e.g. the assign that follows this unassign when a move (w/o wait on SCP
|
||||||
|
// the assign could run w/o logs being split so data loss).
|
||||||
try {
|
try {
|
||||||
reportTransition(env, regionNode, TransitionCode.CLOSED, HConstants.NO_SEQNUM);
|
reportTransition(env, regionNode, TransitionCode.CLOSED, HConstants.NO_SEQNUM);
|
||||||
} catch (UnexpectedStateException e) {
|
} catch (UnexpectedStateException e) {
|
||||||
|
@ -237,19 +236,22 @@ public class UnassignProcedure extends RegionTransitionProcedure {
|
||||||
// TODO
|
// TODO
|
||||||
// RS is aborting, we cannot offline the region since the region may need to do WAL
|
// RS is aborting, we cannot offline the region since the region may need to do WAL
|
||||||
// recovery. Until we see the RS expiration, we should retry.
|
// recovery. Until we see the RS expiration, we should retry.
|
||||||
|
// TODO: This should be suspend like the below where we call expire on server?
|
||||||
LOG.info("Ignoring; waiting on ServerCrashProcedure", exception);
|
LOG.info("Ignoring; waiting on ServerCrashProcedure", exception);
|
||||||
// serverCrashed.set(true);
|
|
||||||
} else if (exception instanceof NotServingRegionException) {
|
} else if (exception instanceof NotServingRegionException) {
|
||||||
LOG.info("IS THIS OK? ANY LOGS TO REPLAY; ACTING AS THOUGH ALL GOOD " + regionNode, exception);
|
LOG.info("IS THIS OK? ANY LOGS TO REPLAY; ACTING AS THOUGH ALL GOOD " + regionNode,
|
||||||
|
exception);
|
||||||
setTransitionState(RegionTransitionState.REGION_TRANSITION_FINISH);
|
setTransitionState(RegionTransitionState.REGION_TRANSITION_FINISH);
|
||||||
} else {
|
} else {
|
||||||
// TODO: kill the server in case we get an exception we are not able to handle
|
LOG.warn("Expiring server " + this + "; " + regionNode.toShortString() +
|
||||||
LOG.warn("Killing server; unexpected exception; " +
|
", exception=" + exception);
|
||||||
this + "; " + regionNode.toShortString() +
|
|
||||||
" exception=" + exception);
|
|
||||||
env.getMasterServices().getServerManager().expireServer(regionNode.getRegionLocation());
|
env.getMasterServices().getServerManager().expireServer(regionNode.getRegionLocation());
|
||||||
serverCrashed.set(true);
|
// Return false so this procedure stays in suspended state. It will be woken up by a
|
||||||
|
// ServerCrashProcedure when it notices this RIT.
|
||||||
|
// TODO: Add a SCP as a new subprocedure that we now come to depend on.
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -118,7 +118,7 @@ public class DisableTableProcedure
|
||||||
postDisable(env, state);
|
postDisable(env, state);
|
||||||
return Flow.NO_MORE_STATE;
|
return Flow.NO_MORE_STATE;
|
||||||
default:
|
default:
|
||||||
throw new UnsupportedOperationException("unhandled state=" + state);
|
throw new UnsupportedOperationException("Unhandled state=" + state);
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
if (isRollbackSupported(state)) {
|
if (isRollbackSupported(state)) {
|
||||||
|
@ -147,7 +147,7 @@ public class DisableTableProcedure
|
||||||
}
|
}
|
||||||
|
|
||||||
// The delete doesn't have a rollback. The execution will succeed, at some point.
|
// The delete doesn't have a rollback. The execution will succeed, at some point.
|
||||||
throw new UnsupportedOperationException("unhandled state=" + state);
|
throw new UnsupportedOperationException("Unhandled state=" + state);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -197,7 +197,7 @@ public class RSProcedureDispatcher
|
||||||
}
|
}
|
||||||
|
|
||||||
// trying to send the request elsewhere instead
|
// trying to send the request elsewhere instead
|
||||||
LOG.warn(String.format("the request should be tried elsewhere instead; server=%s try=%d",
|
LOG.warn(String.format("Failed dispatch to server=%s try=%d",
|
||||||
serverName, numberOfAttemptsSoFar), e);
|
serverName, numberOfAttemptsSoFar), e);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,7 +23,8 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Passed as Exception by {@link ServerCrashProcedure}
|
* Passed as Exception by {@link ServerCrashProcedure}
|
||||||
* notifying on-going RIT that server has failed.
|
* notifying on-going RIT that server has failed. This exception is less an error-condition than
|
||||||
|
* it is a signal to waiting procedures that they can now proceed.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@SuppressWarnings("serial")
|
@SuppressWarnings("serial")
|
||||||
|
|
|
@ -356,7 +356,8 @@ implements ServerProcedureInterface {
|
||||||
* Handle any outstanding RIT that are up against this.serverName, the crashed server.
|
* Handle any outstanding RIT that are up against this.serverName, the crashed server.
|
||||||
* Notify them of crash. Remove assign entries from the passed in <code>regions</code>
|
* Notify them of crash. Remove assign entries from the passed in <code>regions</code>
|
||||||
* otherwise we have two assigns going on and they will fight over who has lock.
|
* otherwise we have two assigns going on and they will fight over who has lock.
|
||||||
* Notify Unassigns also.
|
* Notify Unassigns. If unable to unassign because server went away, unassigns block waiting
|
||||||
|
* on the below callback from a ServerCrashProcedure before proceeding.
|
||||||
* @param env
|
* @param env
|
||||||
* @param regions Regions that were on crashed server
|
* @param regions Regions that were on crashed server
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -403,12 +403,16 @@ public class TestSplitTransactionOnCluster {
|
||||||
for (HRegion d: daughters) {
|
for (HRegion d: daughters) {
|
||||||
LOG.info("Regions after crash: " + d);
|
LOG.info("Regions after crash: " + d);
|
||||||
}
|
}
|
||||||
|
if (daughters.size() != regions.size()) {
|
||||||
|
LOG.info("Daughters=" + daughters.size() + ", regions=" + regions.size());
|
||||||
|
}
|
||||||
assertEquals(daughters.size(), regions.size());
|
assertEquals(daughters.size(), regions.size());
|
||||||
for (HRegion r: regions) {
|
for (HRegion r: regions) {
|
||||||
LOG.info("Regions post crash " + r);
|
LOG.info("Regions post crash " + r + ", contains=" + daughters.contains(r));
|
||||||
assertTrue("Missing region post crash " + r, daughters.contains(r));
|
assertTrue("Missing region post crash " + r, daughters.contains(r));
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
LOG.info("EXITING");
|
||||||
admin.setBalancerRunning(true, false);
|
admin.setBalancerRunning(true, false);
|
||||||
cluster.getMaster().setCatalogJanitorEnabled(true);
|
cluster.getMaster().setCatalogJanitorEnabled(true);
|
||||||
t.close();
|
t.close();
|
||||||
|
@ -799,7 +803,7 @@ public class TestSplitTransactionOnCluster {
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
this.admin.splitRegion(hri.getRegionName());
|
this.admin.splitRegion(hri.getRegionName());
|
||||||
for (int i = 0; this.cluster.getRegions(hri.getTable()).size() <= regionCount && i < 60; i++) {
|
for (int i = 0; this.cluster.getRegions(hri.getTable()).size() <= regionCount && i < 60; i++) {
|
||||||
LOG.debug("Waiting on region to split");
|
LOG.debug("Waiting on region " + hri.getRegionNameAsString() + " to split");
|
||||||
Thread.sleep(2000);
|
Thread.sleep(2000);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -827,10 +831,13 @@ public class TestSplitTransactionOnCluster {
|
||||||
// the table region serving server.
|
// the table region serving server.
|
||||||
int metaServerIndex = cluster.getServerWithMeta();
|
int metaServerIndex = cluster.getServerWithMeta();
|
||||||
assertTrue(metaServerIndex == -1); // meta is on master now
|
assertTrue(metaServerIndex == -1); // meta is on master now
|
||||||
|
// TODO: When we change master so it doesn't carry regions, be careful here.
|
||||||
HRegionServer metaRegionServer = cluster.getMaster();
|
HRegionServer metaRegionServer = cluster.getMaster();
|
||||||
int tableRegionIndex = cluster.getServerWith(hri.getRegionName());
|
int tableRegionIndex = cluster.getServerWith(hri.getRegionName());
|
||||||
assertTrue(tableRegionIndex != -1);
|
assertTrue(tableRegionIndex != -1);
|
||||||
HRegionServer tableRegionServer = cluster.getRegionServer(tableRegionIndex);
|
HRegionServer tableRegionServer = cluster.getRegionServer(tableRegionIndex);
|
||||||
|
LOG.info("MetaRegionServer=" + metaRegionServer.getServerName() +
|
||||||
|
", other=" + tableRegionServer.getServerName());
|
||||||
if (metaRegionServer.getServerName().equals(tableRegionServer.getServerName())) {
|
if (metaRegionServer.getServerName().equals(tableRegionServer.getServerName())) {
|
||||||
HRegionServer hrs = getOtherRegionServer(cluster, metaRegionServer);
|
HRegionServer hrs = getOtherRegionServer(cluster, metaRegionServer);
|
||||||
assertNotNull(hrs);
|
assertNotNull(hrs);
|
||||||
|
@ -848,8 +855,8 @@ public class TestSplitTransactionOnCluster {
|
||||||
tableRegionIndex + " and metaServerIndex=" + metaServerIndex);
|
tableRegionIndex + " and metaServerIndex=" + metaServerIndex);
|
||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
}
|
}
|
||||||
assertTrue("Region not moved off hbase:meta server", tableRegionIndex != -1
|
assertTrue("Region not moved off hbase:meta server, tableRegionIndex=" + tableRegionIndex,
|
||||||
&& tableRegionIndex != metaServerIndex);
|
tableRegionIndex != -1 && tableRegionIndex != metaServerIndex);
|
||||||
// Verify for sure table region is not on same server as hbase:meta
|
// Verify for sure table region is not on same server as hbase:meta
|
||||||
tableRegionIndex = cluster.getServerWith(hri.getRegionName());
|
tableRegionIndex = cluster.getServerWith(hri.getRegionName());
|
||||||
assertTrue(tableRegionIndex != -1);
|
assertTrue(tableRegionIndex != -1);
|
||||||
|
|
Loading…
Reference in New Issue