Revert "HBASE-18551 [AMv2] UnassignProcedure and crashed regionservers"
This reverts commit 2dd75d10f8
.
This commit is contained in:
parent
b65f119c78
commit
e4ba404a5a
|
@ -315,7 +315,7 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
@Override
|
||||
public void setMaxProcId(long maxProcId) {
|
||||
assert lastProcId.get() < 0 : "expected only one call to setMaxProcId()";
|
||||
LOG.debug("Load max pid=" + maxProcId);
|
||||
LOG.debug("Load maxProcId=" + maxProcId);
|
||||
lastProcId.set(maxProcId);
|
||||
}
|
||||
|
||||
|
@ -727,7 +727,7 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
!(procedures.containsKey(oldProcId) || completed.containsKey(oldProcId)) &&
|
||||
nonceKeysToProcIdsMap.containsKey(nonceKey)) {
|
||||
if (traceEnabled) {
|
||||
LOG.trace("Waiting for pid=" + oldProcId.longValue() + " to be submitted");
|
||||
LOG.trace("Waiting for procId=" + oldProcId.longValue() + " to be submitted");
|
||||
}
|
||||
Threads.sleep(100);
|
||||
}
|
||||
|
@ -999,9 +999,9 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
public void removeResult(final long procId) {
|
||||
CompletedProcedureRetainer retainer = completed.get(procId);
|
||||
if (retainer == null) {
|
||||
assert !procedures.containsKey(procId) : "pid=" + procId + " is still running";
|
||||
assert !procedures.containsKey(procId) : "procId=" + procId + " is still running";
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("pid=" + procId + " already removed by the cleaner.");
|
||||
LOG.debug("procId=" + procId + " already removed by the cleaner.");
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
@ -1349,7 +1349,7 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
return LockState.LOCK_YIELD_WAIT;
|
||||
} catch (Throwable e) {
|
||||
// Catch NullPointerExceptions or similar errors...
|
||||
LOG.fatal("CODE-BUG: Uncaught runtime exception for " + proc, e);
|
||||
LOG.fatal("CODE-BUG: Uncaught runtime exception fo " + proc, e);
|
||||
}
|
||||
|
||||
// allows to kill the executor before something is stored to the wal.
|
||||
|
|
|
@ -1007,7 +1007,7 @@ public class MasterRpcServices extends RSRpcServices
|
|||
@Override
|
||||
public GetProcedureResultResponse getProcedureResult(RpcController controller,
|
||||
GetProcedureResultRequest request) throws ServiceException {
|
||||
LOG.debug("Checking to see if procedure is done pid=" + request.getProcId());
|
||||
LOG.debug("Checking to see if procedure is done procId=" + request.getProcId());
|
||||
try {
|
||||
master.checkInitialized();
|
||||
GetProcedureResultResponse.Builder builder = GetProcedureResultResponse.newBuilder();
|
||||
|
|
|
@ -240,7 +240,7 @@ public class TableNamespaceManager {
|
|||
// Sleep some
|
||||
Threads.sleep(10);
|
||||
}
|
||||
throw new TimeoutIOException("Procedure pid=" + procId + " is still running");
|
||||
throw new TimeoutIOException("Procedure " + procId + " is still running");
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -42,7 +42,6 @@ import org.apache.hadoop.hbase.client.TableState;
|
|||
/**
|
||||
* This is a helper class used to manage table states.
|
||||
* States persisted in tableinfo and cached internally.
|
||||
* TODO: Cache state. Cut down on meta looksups.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class TableStateManager {
|
||||
|
|
|
@ -27,13 +27,10 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
|
||||
import org.apache.hadoop.hbase.client.TableState;
|
||||
import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
|
||||
import org.apache.hadoop.hbase.master.RegionState.State;
|
||||
import org.apache.hadoop.hbase.master.TableStateManager;
|
||||
import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.RegionOpenOperation;
|
||||
|
@ -153,13 +150,6 @@ public class AssignProcedure extends RegionTransitionProcedure {
|
|||
LOG.info("Assigned, not reassigning; " + this + "; " + regionNode.toShortString());
|
||||
return false;
|
||||
}
|
||||
// Don't assign if table is in disabling of disbled state.
|
||||
TableStateManager tsm = env.getMasterServices().getTableStateManager();
|
||||
TableName tn = regionNode.getRegionInfo().getTable();
|
||||
if (tsm.isTableState(tn, TableState.State.DISABLING, TableState.State.DISABLED)) {
|
||||
LOG.info("Table is state=" + tsm.getTableState(tn) + ", skipping " + this);
|
||||
return false;
|
||||
}
|
||||
// If the region is SPLIT, we can't assign it. But state might be CLOSED, rather than
|
||||
// SPLIT which is what a region gets set to when Unassigned as part of SPLIT. FIX.
|
||||
if (regionNode.isInState(State.SPLIT) ||
|
||||
|
@ -331,10 +321,9 @@ public class AssignProcedure extends RegionTransitionProcedure {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected boolean remoteCallFailed(final MasterProcedureEnv env, final RegionStateNode regionNode,
|
||||
protected void remoteCallFailed(final MasterProcedureEnv env, final RegionStateNode regionNode,
|
||||
final IOException exception) {
|
||||
handleFailure(env, regionNode);
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -165,13 +165,7 @@ public abstract class RegionTransitionProcedure
|
|||
RegionStateNode regionNode, TransitionCode code, long seqId) throws UnexpectedStateException;
|
||||
|
||||
public abstract RemoteOperation remoteCallBuild(MasterProcedureEnv env, ServerName serverName);
|
||||
|
||||
/**
|
||||
* @return True if processing of fail is complete; the procedure will be woken from its suspend
|
||||
* and we'll go back to running through procedure steps:
|
||||
* otherwise if false we leave the procedure in suspended state.
|
||||
*/
|
||||
protected abstract boolean remoteCallFailed(MasterProcedureEnv env,
|
||||
protected abstract void remoteCallFailed(MasterProcedureEnv env,
|
||||
RegionStateNode regionNode, IOException exception);
|
||||
|
||||
@Override
|
||||
|
@ -187,16 +181,13 @@ public abstract class RegionTransitionProcedure
|
|||
assert serverName.equals(regionNode.getRegionLocation());
|
||||
String msg = exception.getMessage() == null? exception.getClass().getSimpleName():
|
||||
exception.getMessage();
|
||||
LOG.warn("Remote call failed " + this + "; " + regionNode.toShortString() +
|
||||
"; exception=" + msg);
|
||||
if (remoteCallFailed(env, regionNode, exception)) {
|
||||
LOG.warn("Failed " + this + "; " + regionNode.toShortString() + "; exception=" + msg);
|
||||
remoteCallFailed(env, regionNode, exception);
|
||||
// NOTE: This call to wakeEvent puts this Procedure back on the scheduler.
|
||||
// Thereafter, another Worker can be in here so DO NOT MESS WITH STATE beyond
|
||||
// this method. Just get out of this current processing quickly.
|
||||
env.getProcedureScheduler().wakeEvent(regionNode.getProcedureEvent());
|
||||
}
|
||||
// else leave the procedure in suspended state; it is waiting on another call to this callback
|
||||
}
|
||||
|
||||
/**
|
||||
* Be careful! At the end of this method, the procedure has either succeeded
|
||||
|
@ -219,10 +210,9 @@ public abstract class RegionTransitionProcedure
|
|||
// from remote regionserver. Means Procedure associated ProcedureEvent is marked not 'ready'.
|
||||
env.getProcedureScheduler().suspendEvent(getRegionState(env).getProcedureEvent());
|
||||
|
||||
// Tricky because this the below call to addOperationToNode can fail. If it fails, we need to
|
||||
// backtrack on stuff like the 'suspend' done above -- tricky as the 'wake' requeues us -- and
|
||||
// ditto up in the caller; it needs to undo state changes. Inside in remoteCallFailed, it does
|
||||
// wake to undo the above suspend.
|
||||
// Tricky because this can fail. If it fails need to backtrack on stuff like
|
||||
// the 'suspend' done above -- tricky as the 'wake' requeues us -- and ditto
|
||||
// up in the caller; it needs to undo state changes.
|
||||
if (!env.getRemoteDispatcher().addOperationToNode(targetServer, this)) {
|
||||
remoteCallFailed(env, targetServer,
|
||||
new FailedRemoteDispatchException(this + " to " + targetServer));
|
||||
|
|
|
@ -22,7 +22,6 @@ package org.apache.hadoop.hbase.master.assignment;
|
|||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.net.ConnectException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -50,28 +49,18 @@ import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
|
|||
|
||||
|
||||
/**
|
||||
* Procedure that describes the unassignment of a single region.
|
||||
* There can only be one RegionTransitionProcedure -- i.e. an assign or an unassign -- per region
|
||||
* running at a time, since each procedure takes a lock on the region.
|
||||
* Procedure that describe the unassignment of a single region.
|
||||
* There can only be one RegionTransitionProcedure per region running at the time,
|
||||
* since each procedure takes a lock on the region.
|
||||
*
|
||||
* <p>The Unassign starts by placing a "close region" request in the Remote Dispatcher
|
||||
* queue, and the procedure will then go into a "waiting state" (suspend).
|
||||
* queue, and the procedure will then go into a "waiting state".
|
||||
* The Remote Dispatcher will batch the various requests for that server and
|
||||
* they will be sent to the RS for execution.
|
||||
* The RS will complete the open operation by calling master.reportRegionStateTransition().
|
||||
* The AM will intercept the transition report, and notify this procedure.
|
||||
* The procedure will wakeup and finish the unassign by publishing its new state on meta.
|
||||
* <p>If we are unable to contact the remote regionserver whether because of ConnectException
|
||||
* or socket timeout, we will call expire on the server we were trying to contact. We will remain
|
||||
* in suspended state waiting for a wake up from the ServerCrashProcedure that is processing the
|
||||
* failed server. The basic idea is that if we notice a crashed server, then we have a
|
||||
* responsibility; i.e. we should not let go of the region until we are sure the server that was
|
||||
* hosting has had its crash processed. If we let go of the region before then, an assign might
|
||||
* run before the logs have been split which would make for data loss.
|
||||
*
|
||||
* <p>TODO: Rather than this tricky coordination between SCP and this Procedure, instead, work on
|
||||
* returning a SCP as our subprocedure; probably needs work on the framework to do this,
|
||||
* especially if the SCP already created.
|
||||
* The AM will intercept the transition report, and notify the procedure.
|
||||
* The procedure will finish the unassign by publishing its new state on meta
|
||||
* or it will retry the unassign.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class UnassignProcedure extends RegionTransitionProcedure {
|
||||
|
@ -86,6 +75,8 @@ public class UnassignProcedure extends RegionTransitionProcedure {
|
|||
*/
|
||||
protected volatile ServerName destinationServer;
|
||||
|
||||
protected final AtomicBoolean serverCrashed = new AtomicBoolean(false);
|
||||
|
||||
// TODO: should this be in a reassign procedure?
|
||||
// ...and keep unassign for 'disable' case?
|
||||
private boolean force;
|
||||
|
@ -170,6 +161,15 @@ public class UnassignProcedure extends RegionTransitionProcedure {
|
|||
return false;
|
||||
}
|
||||
|
||||
// if the server is down, mark the operation as failed. region cannot be unassigned
|
||||
// if server is down
|
||||
if (serverCrashed.get() || !isServerOnline(env, regionNode)) {
|
||||
LOG.warn("Server already down: " + this + "; " + regionNode.toShortString());
|
||||
setFailure("source region server not online",
|
||||
new ServerCrashException(getProcId(), regionNode.getRegionLocation()));
|
||||
return false;
|
||||
}
|
||||
|
||||
// if we haven't started the operation yet, we can abort
|
||||
if (aborted.get() && regionNode.isInState(State.OPEN)) {
|
||||
setAbortFailure(getClass().getSimpleName(), "abort requested");
|
||||
|
@ -181,12 +181,12 @@ public class UnassignProcedure extends RegionTransitionProcedure {
|
|||
|
||||
// Add the close region operation the the server dispatch queue.
|
||||
if (!addToRemoteDispatcher(env, regionNode.getRegionLocation())) {
|
||||
// If addToRemoteDispatcher fails, it calls the callback #remoteCallFailed.
|
||||
// If addToRemoteDispatcher fails, it calls #remoteCallFailed which
|
||||
// does all cleanup.
|
||||
}
|
||||
|
||||
// We always return true, even if we fail dispatch because addToRemoteDispatcher
|
||||
// failure processing sets state back to REGION_TRANSITION_QUEUE so we come back in here again
|
||||
// to probably go into the clause after waitOnServerCrashProcedure;
|
||||
// failure processing sets state back to REGION_TRANSITION_QUEUE so we try again;
|
||||
// i.e. return true to keep the Procedure running; it has been reset to startover.
|
||||
return true;
|
||||
}
|
||||
|
@ -218,15 +218,13 @@ public class UnassignProcedure extends RegionTransitionProcedure {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected boolean remoteCallFailed(final MasterProcedureEnv env, final RegionStateNode regionNode,
|
||||
protected void remoteCallFailed(final MasterProcedureEnv env, final RegionStateNode regionNode,
|
||||
final IOException exception) {
|
||||
// TODO: Is there on-going rpc to cleanup?
|
||||
if (exception instanceof ServerCrashException) {
|
||||
// This exception comes from ServerCrashProcedure after log splitting.
|
||||
// SCP found this region as a RIT. Its call into here says it is ok to let this procedure go
|
||||
// on to a complete close now. This will release lock on this region so subsequent action on
|
||||
// region can succeed; e.g. the assign that follows this unassign when a move (w/o wait on SCP
|
||||
// the assign could run w/o logs being split so data loss).
|
||||
// It is ok to let this procedure go on to complete close now.
|
||||
// This will release lock on this region so the subsequent assign can succeed.
|
||||
try {
|
||||
reportTransition(env, regionNode, TransitionCode.CLOSED, HConstants.NO_SEQNUM);
|
||||
} catch (UnexpectedStateException e) {
|
||||
|
@ -239,22 +237,19 @@ public class UnassignProcedure extends RegionTransitionProcedure {
|
|||
// TODO
|
||||
// RS is aborting, we cannot offline the region since the region may need to do WAL
|
||||
// recovery. Until we see the RS expiration, we should retry.
|
||||
// TODO: This should be suspend like the below where we call expire on server?
|
||||
LOG.info("Ignoring; waiting on ServerCrashProcedure", exception);
|
||||
// serverCrashed.set(true);
|
||||
} else if (exception instanceof NotServingRegionException) {
|
||||
LOG.info("IS THIS OK? ANY LOGS TO REPLAY; ACTING AS THOUGH ALL GOOD " + regionNode,
|
||||
exception);
|
||||
LOG.info("IS THIS OK? ANY LOGS TO REPLAY; ACTING AS THOUGH ALL GOOD " + regionNode, exception);
|
||||
setTransitionState(RegionTransitionState.REGION_TRANSITION_FINISH);
|
||||
} else {
|
||||
LOG.warn("Expiring server " + this + "; " + regionNode.toShortString() +
|
||||
", exception=" + exception);
|
||||
// TODO: kill the server in case we get an exception we are not able to handle
|
||||
LOG.warn("Killing server; unexpected exception; " +
|
||||
this + "; " + regionNode.toShortString() +
|
||||
" exception=" + exception);
|
||||
env.getMasterServices().getServerManager().expireServer(regionNode.getRegionLocation());
|
||||
// Return false so this procedure stays in suspended state. It will be woken up by a
|
||||
// ServerCrashProcedure when it notices this RIT.
|
||||
// TODO: Add a SCP as a new subprocedure that we now come to depend on.
|
||||
return false;
|
||||
serverCrashed.set(true);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -118,7 +118,7 @@ public class DisableTableProcedure
|
|||
postDisable(env, state);
|
||||
return Flow.NO_MORE_STATE;
|
||||
default:
|
||||
throw new UnsupportedOperationException("Unhandled state=" + state);
|
||||
throw new UnsupportedOperationException("unhandled state=" + state);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
if (isRollbackSupported(state)) {
|
||||
|
@ -147,7 +147,7 @@ public class DisableTableProcedure
|
|||
}
|
||||
|
||||
// The delete doesn't have a rollback. The execution will succeed, at some point.
|
||||
throw new UnsupportedOperationException("Unhandled state=" + state);
|
||||
throw new UnsupportedOperationException("unhandled state=" + state);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -197,7 +197,7 @@ public class RSProcedureDispatcher
|
|||
}
|
||||
|
||||
// trying to send the request elsewhere instead
|
||||
LOG.warn(String.format("Failed dispatch to server=%s try=%d",
|
||||
LOG.warn(String.format("the request should be tried elsewhere instead; server=%s try=%d",
|
||||
serverName, numberOfAttemptsSoFar), e);
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -23,8 +23,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|||
|
||||
/**
|
||||
* Passed as Exception by {@link ServerCrashProcedure}
|
||||
* notifying on-going RIT that server has failed. This exception is less an error-condition than
|
||||
* it is a signal to waiting procedures that they can now proceed.
|
||||
* notifying on-going RIT that server has failed.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@SuppressWarnings("serial")
|
||||
|
|
|
@ -356,8 +356,7 @@ implements ServerProcedureInterface {
|
|||
* Handle any outstanding RIT that are up against this.serverName, the crashed server.
|
||||
* Notify them of crash. Remove assign entries from the passed in <code>regions</code>
|
||||
* otherwise we have two assigns going on and they will fight over who has lock.
|
||||
* Notify Unassigns. If unable to unassign because server went away, unassigns block waiting
|
||||
* on the below callback from a ServerCrashProcedure before proceeding.
|
||||
* Notify Unassigns also.
|
||||
* @param env
|
||||
* @param regions Regions that were on crashed server
|
||||
*/
|
||||
|
|
|
@ -403,16 +403,12 @@ public class TestSplitTransactionOnCluster {
|
|||
for (HRegion d: daughters) {
|
||||
LOG.info("Regions after crash: " + d);
|
||||
}
|
||||
if (daughters.size() != regions.size()) {
|
||||
LOG.info("Daughters=" + daughters.size() + ", regions=" + regions.size());
|
||||
}
|
||||
assertEquals(daughters.size(), regions.size());
|
||||
for (HRegion r: regions) {
|
||||
LOG.info("Regions post crash " + r + ", contains=" + daughters.contains(r));
|
||||
LOG.info("Regions post crash " + r);
|
||||
assertTrue("Missing region post crash " + r, daughters.contains(r));
|
||||
}
|
||||
} finally {
|
||||
LOG.info("EXITING");
|
||||
admin.setBalancerRunning(true, false);
|
||||
cluster.getMaster().setCatalogJanitorEnabled(true);
|
||||
t.close();
|
||||
|
@ -803,7 +799,7 @@ public class TestSplitTransactionOnCluster {
|
|||
throws IOException, InterruptedException {
|
||||
this.admin.splitRegion(hri.getRegionName());
|
||||
for (int i = 0; this.cluster.getRegions(hri.getTable()).size() <= regionCount && i < 60; i++) {
|
||||
LOG.debug("Waiting on region " + hri.getRegionNameAsString() + " to split");
|
||||
LOG.debug("Waiting on region to split");
|
||||
Thread.sleep(2000);
|
||||
}
|
||||
|
||||
|
@ -831,12 +827,10 @@ public class TestSplitTransactionOnCluster {
|
|||
// the table region serving server.
|
||||
int metaServerIndex = cluster.getServerWithMeta();
|
||||
assertTrue(metaServerIndex == -1); // meta is on master now
|
||||
HRegionServer metaRegionServer = cluster.getRegionServer(metaServerIndex);
|
||||
HRegionServer metaRegionServer = cluster.getMaster();
|
||||
int tableRegionIndex = cluster.getServerWith(hri.getRegionName());
|
||||
assertTrue(tableRegionIndex != -1);
|
||||
HRegionServer tableRegionServer = cluster.getRegionServer(tableRegionIndex);
|
||||
LOG.info("MetaRegionServer=" + metaRegionServer.getServerName() +
|
||||
", other=" + tableRegionServer.getServerName());
|
||||
if (metaRegionServer.getServerName().equals(tableRegionServer.getServerName())) {
|
||||
HRegionServer hrs = getOtherRegionServer(cluster, metaRegionServer);
|
||||
assertNotNull(hrs);
|
||||
|
@ -854,8 +848,8 @@ public class TestSplitTransactionOnCluster {
|
|||
tableRegionIndex + " and metaServerIndex=" + metaServerIndex);
|
||||
Thread.sleep(100);
|
||||
}
|
||||
assertTrue("Region not moved off hbase:meta server, tableRegionIndex=" + tableRegionIndex,
|
||||
tableRegionIndex != -1 && tableRegionIndex != metaServerIndex);
|
||||
assertTrue("Region not moved off hbase:meta server", tableRegionIndex != -1
|
||||
&& tableRegionIndex != metaServerIndex);
|
||||
// Verify for sure table region is not on same server as hbase:meta
|
||||
tableRegionIndex = cluster.getServerWith(hri.getRegionName());
|
||||
assertTrue(tableRegionIndex != -1);
|
||||
|
|
Loading…
Reference in New Issue