HBASE-22365 Region may be opened on two RegionServers
This commit is contained in:
parent
b8365e5110
commit
62ad94c2b5
|
@ -213,6 +213,9 @@ public class AssignmentManager {
|
||||||
try {
|
try {
|
||||||
regionNode.setRegionLocation(regionState.getServerName());
|
regionNode.setRegionLocation(regionState.getServerName());
|
||||||
regionNode.setState(regionState.getState());
|
regionNode.setState(regionState.getState());
|
||||||
|
if (regionNode.getProcedure() != null) {
|
||||||
|
regionNode.getProcedure().stateLoaded(this, regionNode);
|
||||||
|
}
|
||||||
setMetaAssigned(regionState.getRegion(), regionState.getState() == State.OPEN);
|
setMetaAssigned(regionState.getRegion(), regionState.getState() == State.OPEN);
|
||||||
} finally {
|
} finally {
|
||||||
regionNode.unlock();
|
regionNode.unlock();
|
||||||
|
@ -235,14 +238,12 @@ public class AssignmentManager {
|
||||||
TransitRegionStateProcedure existingProc = regionNode.getProcedure();
|
TransitRegionStateProcedure existingProc = regionNode.getProcedure();
|
||||||
if (existingProc != null) {
|
if (existingProc != null) {
|
||||||
// This is possible, as we will detach the procedure from the RSN before we
|
// This is possible, as we will detach the procedure from the RSN before we
|
||||||
// actually finish the procedure. This is because that, we will update the region state
|
// actually finish the procedure. This is because that, we will detach the TRSP from the RSN
|
||||||
// directly in the reportTransition method for TRSP, and theoretically the region transition
|
// during execution, at that time, the procedure has not been marked as done in the pv2
|
||||||
// has been done, so we need to detach the procedure from the RSN. But actually the
|
// framework yet, so it is possible that we schedule a new TRSP immediately and when
|
||||||
// procedure has not been marked as done in the pv2 framework yet, so it is possible that we
|
// arriving here, we will find out that there are multiple TRSPs for the region. But we can
|
||||||
// schedule a new TRSP immediately and when arriving here, we will find out that there are
|
// make sure that, only the last one can take the charge, the previous ones should have all
|
||||||
// multiple TRSPs for the region. But we can make sure that, only the last one can take the
|
// been finished already. So here we will compare the proc id, the greater one will win.
|
||||||
// charge, the previous ones should have all been finished already.
|
|
||||||
// So here we will compare the proc id, the greater one will win.
|
|
||||||
if (existingProc.getProcId() < proc.getProcId()) {
|
if (existingProc.getProcId() < proc.getProcId()) {
|
||||||
// the new one wins, unset and set it to the new one below
|
// the new one wins, unset and set it to the new one below
|
||||||
regionNode.unsetProcedure(existingProc);
|
regionNode.unsetProcedure(existingProc);
|
||||||
|
@ -1307,7 +1308,6 @@ public class AssignmentManager {
|
||||||
// In any of these cases, state is empty. For now, presume OFFLINE but there are probably
|
// In any of these cases, state is empty. For now, presume OFFLINE but there are probably
|
||||||
// cases where we need to probe more to be sure this correct; TODO informed by experience.
|
// cases where we need to probe more to be sure this correct; TODO informed by experience.
|
||||||
LOG.info(regionInfo.getEncodedName() + " regionState=null; presuming " + State.OFFLINE);
|
LOG.info(regionInfo.getEncodedName() + " regionState=null; presuming " + State.OFFLINE);
|
||||||
|
|
||||||
localState = State.OFFLINE;
|
localState = State.OFFLINE;
|
||||||
}
|
}
|
||||||
RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
|
RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
|
||||||
|
@ -1328,6 +1328,9 @@ public class AssignmentManager {
|
||||||
} else if (localState == State.OFFLINE || regionInfo.isOffline()) {
|
} else if (localState == State.OFFLINE || regionInfo.isOffline()) {
|
||||||
regionStates.addToOfflineRegions(regionNode);
|
regionStates.addToOfflineRegions(regionNode);
|
||||||
}
|
}
|
||||||
|
if (regionNode.getProcedure() != null) {
|
||||||
|
regionNode.getProcedure().stateLoaded(AssignmentManager.this, regionNode);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -1489,7 +1492,8 @@ public class AssignmentManager {
|
||||||
|
|
||||||
// ============================================================================================
|
// ============================================================================================
|
||||||
// Region Status update
|
// Region Status update
|
||||||
// Should only be called in TransitRegionStateProcedure
|
// Should only be called in TransitRegionStateProcedure(and related procedures), as the locking
|
||||||
|
// and pre-assumptions are very tricky.
|
||||||
// ============================================================================================
|
// ============================================================================================
|
||||||
private void transitStateAndUpdate(RegionStateNode regionNode, RegionState.State newState,
|
private void transitStateAndUpdate(RegionStateNode regionNode, RegionState.State newState,
|
||||||
RegionState.State... expectedStates) throws IOException {
|
RegionState.State... expectedStates) throws IOException {
|
||||||
|
@ -1518,7 +1522,7 @@ public class AssignmentManager {
|
||||||
metrics.incrementOperationCounter();
|
metrics.incrementOperationCounter();
|
||||||
}
|
}
|
||||||
|
|
||||||
// should be called within the synchronized block of RegionStateNode.
|
// should be called under the RegionStateNode lock
|
||||||
// The parameter 'giveUp' means whether we will try to open the region again, if it is true, then
|
// The parameter 'giveUp' means whether we will try to open the region again, if it is true, then
|
||||||
// we will persist the FAILED_OPEN state into hbase:meta.
|
// we will persist the FAILED_OPEN state into hbase:meta.
|
||||||
void regionFailedOpen(RegionStateNode regionNode, boolean giveUp) throws IOException {
|
void regionFailedOpen(RegionStateNode regionNode, boolean giveUp) throws IOException {
|
||||||
|
@ -1544,24 +1548,7 @@ public class AssignmentManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// should be called within the synchronized block of RegionStateNode
|
// should be called under the RegionStateNode lock
|
||||||
void regionOpened(RegionStateNode regionNode) throws IOException {
|
|
||||||
// TODO: OPENING Updates hbase:meta too... we need to do both here and there?
|
|
||||||
// That is a lot of hbase:meta writing.
|
|
||||||
transitStateAndUpdate(regionNode, State.OPEN, STATES_EXPECTED_ON_OPEN);
|
|
||||||
RegionInfo hri = regionNode.getRegionInfo();
|
|
||||||
if (isMetaRegion(hri)) {
|
|
||||||
// Usually we'd set a table ENABLED at this stage but hbase:meta is ALWAYs enabled, it
|
|
||||||
// can't be disabled -- so skip the RPC (besides... enabled is managed by TableStateManager
|
|
||||||
// which is backed by hbase:meta... Avoid setting ENABLED to avoid having to update state
|
|
||||||
// on table that contains state.
|
|
||||||
setMetaAssigned(hri, true);
|
|
||||||
}
|
|
||||||
regionStates.addRegionToServer(regionNode);
|
|
||||||
regionStates.removeFromFailedOpen(hri);
|
|
||||||
}
|
|
||||||
|
|
||||||
// should be called within the synchronized block of RegionStateNode
|
|
||||||
void regionClosing(RegionStateNode regionNode) throws IOException {
|
void regionClosing(RegionStateNode regionNode) throws IOException {
|
||||||
transitStateAndUpdate(regionNode, State.CLOSING, STATES_EXPECTED_ON_CLOSING);
|
transitStateAndUpdate(regionNode, State.CLOSING, STATES_EXPECTED_ON_CLOSING);
|
||||||
|
|
||||||
|
@ -1575,18 +1562,36 @@ public class AssignmentManager {
|
||||||
metrics.incrementOperationCounter();
|
metrics.incrementOperationCounter();
|
||||||
}
|
}
|
||||||
|
|
||||||
// should be called within the synchronized block of RegionStateNode
|
// for open and close, they will first be persist to the procedure store in
|
||||||
// The parameter 'normally' means whether we are closed cleanly, if it is true, then it means that
|
// RegionRemoteProcedureBase. So here we will first change the in memory state as it is considered
|
||||||
// we are closed due to a RS crash.
|
// as succeeded if the persistence to procedure store is succeeded, and then when the
|
||||||
void regionClosed(RegionStateNode regionNode, boolean normally) throws IOException {
|
// RegionRemoteProcedureBase is woken up, we will persist the RegionStateNode to hbase:meta.
|
||||||
|
|
||||||
|
// should be called under the RegionStateNode lock
|
||||||
|
void regionOpenedWithoutPersistingToMeta(RegionStateNode regionNode) throws IOException {
|
||||||
|
regionNode.transitionState(State.OPEN, STATES_EXPECTED_ON_OPEN);
|
||||||
|
RegionInfo regionInfo = regionNode.getRegionInfo();
|
||||||
|
regionStates.addRegionToServer(regionNode);
|
||||||
|
regionStates.removeFromFailedOpen(regionInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
// should be called under the RegionStateNode lock
|
||||||
|
void regionClosedWithoutPersistingToMeta(RegionStateNode regionNode) throws IOException {
|
||||||
|
ServerName regionLocation = regionNode.getRegionLocation();
|
||||||
|
regionNode.transitionState(State.CLOSED, STATES_EXPECTED_ON_CLOSED);
|
||||||
|
regionNode.setRegionLocation(null);
|
||||||
|
if (regionLocation != null) {
|
||||||
|
regionNode.setLastHost(regionLocation);
|
||||||
|
regionStates.removeRegionFromServer(regionLocation, regionNode);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// should be called under the RegionStateNode lock
|
||||||
|
// for SCP
|
||||||
|
void regionClosedAbnormally(RegionStateNode regionNode) throws IOException {
|
||||||
RegionState.State state = regionNode.getState();
|
RegionState.State state = regionNode.getState();
|
||||||
ServerName regionLocation = regionNode.getRegionLocation();
|
ServerName regionLocation = regionNode.getRegionLocation();
|
||||||
if (normally) {
|
|
||||||
regionNode.transitionState(State.CLOSED, STATES_EXPECTED_ON_CLOSED);
|
|
||||||
} else {
|
|
||||||
// For SCP
|
|
||||||
regionNode.transitionState(State.ABNORMALLY_CLOSED);
|
regionNode.transitionState(State.ABNORMALLY_CLOSED);
|
||||||
}
|
|
||||||
regionNode.setRegionLocation(null);
|
regionNode.setRegionLocation(null);
|
||||||
boolean succ = false;
|
boolean succ = false;
|
||||||
try {
|
try {
|
||||||
|
@ -1605,6 +1610,22 @@ public class AssignmentManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void persistToMeta(RegionStateNode regionNode) throws IOException {
|
||||||
|
regionStateStore.updateRegionLocation(regionNode);
|
||||||
|
RegionInfo regionInfo = regionNode.getRegionInfo();
|
||||||
|
if (isMetaRegion(regionInfo) && regionNode.getState() == State.OPEN) {
|
||||||
|
// Usually we'd set a table ENABLED at this stage but hbase:meta is ALWAYs enabled, it
|
||||||
|
// can't be disabled -- so skip the RPC (besides... enabled is managed by TableStateManager
|
||||||
|
// which is backed by hbase:meta... Avoid setting ENABLED to avoid having to update state
|
||||||
|
// on table that contains state.
|
||||||
|
setMetaAssigned(regionInfo, true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ============================================================================================
|
||||||
|
// The above methods can only be called in TransitRegionStateProcedure(and related procedures)
|
||||||
|
// ============================================================================================
|
||||||
|
|
||||||
public void markRegionAsSplit(final RegionInfo parent, final ServerName serverName,
|
public void markRegionAsSplit(final RegionInfo parent, final ServerName serverName,
|
||||||
final RegionInfo daughterA, final RegionInfo daughterB) throws IOException {
|
final RegionInfo daughterA, final RegionInfo daughterB) throws IOException {
|
||||||
// Update hbase:meta. Parent will be marked offline and split up in hbase:meta.
|
// Update hbase:meta. Parent will be marked offline and split up in hbase:meta.
|
||||||
|
|
|
@ -21,6 +21,7 @@ import java.io.IOException;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
|
import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
|
||||||
|
import org.apache.hadoop.hbase.master.RegionState.State;
|
||||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||||
import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.RegionCloseOperation;
|
import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.RegionCloseOperation;
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureMetrics;
|
import org.apache.hadoop.hbase.procedure2.ProcedureMetrics;
|
||||||
|
@ -90,8 +91,8 @@ public class CloseRegionProcedure extends RegionRemoteProcedureBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void reportTransition(RegionStateNode regionNode, TransitionCode transitionCode,
|
protected void checkTransition(RegionStateNode regionNode, TransitionCode transitionCode,
|
||||||
long seqId) throws IOException {
|
long seqId) throws UnexpectedStateException {
|
||||||
if (transitionCode != TransitionCode.CLOSED) {
|
if (transitionCode != TransitionCode.CLOSED) {
|
||||||
throw new UnexpectedStateException("Received report unexpected " + transitionCode +
|
throw new UnexpectedStateException("Received report unexpected " + transitionCode +
|
||||||
" transition, " + regionNode.toShortString() + ", " + this + ", expected CLOSED.");
|
" transition, " + regionNode.toShortString() + ", " + this + ", expected CLOSED.");
|
||||||
|
@ -99,8 +100,19 @@ public class CloseRegionProcedure extends RegionRemoteProcedureBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void updateTransition(MasterProcedureEnv env, RegionStateNode regionNode,
|
protected void updateTransitionWithoutPersistingToMeta(MasterProcedureEnv env,
|
||||||
TransitionCode transitionCode, long seqId) throws IOException {
|
RegionStateNode regionNode, TransitionCode transitionCode, long seqId) throws IOException {
|
||||||
env.getAssignmentManager().regionClosed(regionNode, true);
|
assert transitionCode == TransitionCode.CLOSED;
|
||||||
|
env.getAssignmentManager().regionClosedWithoutPersistingToMeta(regionNode);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void restoreSucceedState(AssignmentManager am, RegionStateNode regionNode, long seqId)
|
||||||
|
throws IOException {
|
||||||
|
if (regionNode.getState() == State.CLOSED) {
|
||||||
|
// should have already been persisted, ignore
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
am.regionClosedWithoutPersistingToMeta(regionNode);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,7 @@ import java.io.IOException;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
|
import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
|
||||||
|
import org.apache.hadoop.hbase.master.RegionState.State;
|
||||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||||
import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.RegionOpenOperation;
|
import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.RegionOpenOperation;
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureMetrics;
|
import org.apache.hadoop.hbase.procedure2.ProcedureMetrics;
|
||||||
|
@ -77,32 +78,8 @@ public class OpenRegionProcedure extends RegionRemoteProcedureBase {
|
||||||
return env.getAssignmentManager().getAssignmentManagerMetrics().getOpenProcMetrics();
|
return env.getAssignmentManager().getAssignmentManagerMetrics().getOpenProcMetrics();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
private void regionOpenedWithoutPersistingToMeta(AssignmentManager am, RegionStateNode regionNode,
|
||||||
protected void reportTransition(RegionStateNode regionNode, TransitionCode transitionCode,
|
|
||||||
long seqId) throws IOException {
|
|
||||||
switch (transitionCode) {
|
|
||||||
case OPENED:
|
|
||||||
// this is the openSeqNum
|
|
||||||
if (seqId < 0) {
|
|
||||||
throw new UnexpectedStateException("Received report unexpected " + TransitionCode.OPENED +
|
|
||||||
" transition openSeqNum=" + seqId + ", " + regionNode + ", proc=" + this);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case FAILED_OPEN:
|
|
||||||
// nothing to check
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
throw new UnexpectedStateException(
|
|
||||||
"Received report unexpected " + transitionCode + " transition, " +
|
|
||||||
regionNode.toShortString() + ", " + this + ", expected OPENED or FAILED_OPEN.");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void updateTransition(MasterProcedureEnv env, RegionStateNode regionNode,
|
|
||||||
TransitionCode transitionCode, long openSeqNum) throws IOException {
|
TransitionCode transitionCode, long openSeqNum) throws IOException {
|
||||||
switch (transitionCode) {
|
|
||||||
case OPENED:
|
|
||||||
if (openSeqNum < regionNode.getOpenSeqNum()) {
|
if (openSeqNum < regionNode.getOpenSeqNum()) {
|
||||||
LOG.warn(
|
LOG.warn(
|
||||||
"Received report {} transition from {} for {}, pid={} but the new openSeqNum {}" +
|
"Received report {} transition from {} for {}, pid={} but the new openSeqNum {}" +
|
||||||
|
@ -112,14 +89,49 @@ public class OpenRegionProcedure extends RegionRemoteProcedureBase {
|
||||||
} else {
|
} else {
|
||||||
regionNode.setOpenSeqNum(openSeqNum);
|
regionNode.setOpenSeqNum(openSeqNum);
|
||||||
}
|
}
|
||||||
env.getAssignmentManager().regionOpened(regionNode);
|
am.regionOpenedWithoutPersistingToMeta(regionNode);
|
||||||
break;
|
|
||||||
case FAILED_OPEN:
|
|
||||||
env.getAssignmentManager().regionFailedOpen(regionNode, false);
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
throw new UnexpectedStateException("Unexpected transition code: " + transitionCode);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void checkTransition(RegionStateNode regionNode, TransitionCode transitionCode,
|
||||||
|
long openSeqNum) throws UnexpectedStateException {
|
||||||
|
switch (transitionCode) {
|
||||||
|
case OPENED:
|
||||||
|
if (openSeqNum < 0) {
|
||||||
|
throw new UnexpectedStateException("Received report unexpected " + TransitionCode.OPENED +
|
||||||
|
" transition openSeqNum=" + openSeqNum + ", " + regionNode + ", proc=" + this);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case FAILED_OPEN:
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
throw new UnexpectedStateException(
|
||||||
|
"Received report unexpected " + transitionCode + " transition, " +
|
||||||
|
regionNode.toShortString() + ", " + this + ", expected OPENED or FAILED_OPEN.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void updateTransitionWithoutPersistingToMeta(MasterProcedureEnv env,
|
||||||
|
RegionStateNode regionNode, TransitionCode transitionCode, long openSeqNum)
|
||||||
|
throws IOException {
|
||||||
|
if (transitionCode == TransitionCode.OPENED) {
|
||||||
|
regionOpenedWithoutPersistingToMeta(env.getAssignmentManager(), regionNode, transitionCode,
|
||||||
|
openSeqNum);
|
||||||
|
} else {
|
||||||
|
assert transitionCode == TransitionCode.FAILED_OPEN;
|
||||||
|
// will not persist to meta if giveUp is false
|
||||||
|
env.getAssignmentManager().regionFailedOpen(regionNode, false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void restoreSucceedState(AssignmentManager am, RegionStateNode regionNode,
|
||||||
|
long openSeqNum) throws IOException {
|
||||||
|
if (regionNode.getState() == State.OPEN) {
|
||||||
|
// should have already been persisted, ignore
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
regionOpenedWithoutPersistingToMeta(am, regionNode, TransitionCode.OPENED, openSeqNum);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -153,9 +153,13 @@ public abstract class RegionRemoteProcedureBase extends Procedure<MasterProcedur
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
// do some checks to see if the report is valid, without actually updating meta.
|
// do some checks to see if the report is valid
|
||||||
protected abstract void reportTransition(RegionStateNode regionNode,
|
protected abstract void checkTransition(RegionStateNode regionNode, TransitionCode transitionCode,
|
||||||
TransitionCode transitionCode, long seqId) throws IOException;
|
long seqId) throws UnexpectedStateException;
|
||||||
|
|
||||||
|
// change the in memory state of the regionNode, but do not update meta.
|
||||||
|
protected abstract void updateTransitionWithoutPersistingToMeta(MasterProcedureEnv env,
|
||||||
|
RegionStateNode regionNode, TransitionCode transitionCode, long seqId) throws IOException;
|
||||||
|
|
||||||
// A bit strange but the procedure store will throw RuntimeException if we can not persist the
|
// A bit strange but the procedure store will throw RuntimeException if we can not persist the
|
||||||
// state, so upper layer should take care of this...
|
// state, so upper layer should take care of this...
|
||||||
|
@ -175,7 +179,7 @@ public abstract class RegionRemoteProcedureBase extends Procedure<MasterProcedur
|
||||||
throw new UnexpectedStateException("Received report from " + serverName + ", expected " +
|
throw new UnexpectedStateException("Received report from " + serverName + ", expected " +
|
||||||
targetServer + ", " + regionNode + ", proc=" + this);
|
targetServer + ", " + regionNode + ", proc=" + this);
|
||||||
}
|
}
|
||||||
reportTransition(regionNode, transitionCode, seqId);
|
checkTransition(regionNode, transitionCode, seqId);
|
||||||
// this state means we have received the report from RS, does not mean the result is fine, as we
|
// this state means we have received the report from RS, does not mean the result is fine, as we
|
||||||
// may received a FAILED_OPEN.
|
// may received a FAILED_OPEN.
|
||||||
this.state = RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_REPORT_SUCCEED;
|
this.state = RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_REPORT_SUCCEED;
|
||||||
|
@ -196,13 +200,24 @@ public abstract class RegionRemoteProcedureBase extends Procedure<MasterProcedur
|
||||||
this.seqId = HConstants.NO_SEQNUM;
|
this.seqId = HConstants.NO_SEQNUM;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
try {
|
||||||
|
updateTransitionWithoutPersistingToMeta(env, regionNode, transitionCode, seqId);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new AssertionError("should not happen", e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void serverCrashed(MasterProcedureEnv env, RegionStateNode regionNode, ServerName serverName) {
|
void serverCrashed(MasterProcedureEnv env, RegionStateNode regionNode, ServerName serverName) {
|
||||||
if (state != RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_DISPATCH) {
|
if (state == RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_SERVER_CRASH) {
|
||||||
// should be a retry
|
// should be a retry
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
RegionRemoteProcedureBaseState oldState = state;
|
||||||
|
// it is possible that the state is in REGION_REMOTE_PROCEDURE_SERVER_CRASH, think of this
|
||||||
|
// sequence
|
||||||
|
// 1. region is open on the target server and the above reportTransition call is succeeded
|
||||||
|
// 2. before we are woken up and update the meta, the target server crashes, and then we arrive
|
||||||
|
// here
|
||||||
this.state = RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_SERVER_CRASH;
|
this.state = RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_SERVER_CRASH;
|
||||||
boolean succ = false;
|
boolean succ = false;
|
||||||
try {
|
try {
|
||||||
|
@ -210,7 +225,21 @@ public abstract class RegionRemoteProcedureBase extends Procedure<MasterProcedur
|
||||||
succ = true;
|
succ = true;
|
||||||
} finally {
|
} finally {
|
||||||
if (!succ) {
|
if (!succ) {
|
||||||
this.state = RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_DISPATCH;
|
this.state = oldState;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected abstract void restoreSucceedState(AssignmentManager am, RegionStateNode regionNode,
|
||||||
|
long seqId) throws IOException;
|
||||||
|
|
||||||
|
void stateLoaded(AssignmentManager am, RegionStateNode regionNode) {
|
||||||
|
if (state == RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_REPORT_SUCCEED) {
|
||||||
|
try {
|
||||||
|
restoreSucceedState(am, regionNode, seqId);
|
||||||
|
} catch (IOException e) {
|
||||||
|
// should not happen as we are just restoring the state
|
||||||
|
throw new AssertionError(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -224,10 +253,6 @@ public abstract class RegionRemoteProcedureBase extends Procedure<MasterProcedur
|
||||||
getParent(env).unattachRemoteProc(this);
|
getParent(env).unattachRemoteProc(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
// actually update the state to meta
|
|
||||||
protected abstract void updateTransition(MasterProcedureEnv env, RegionStateNode regionNode,
|
|
||||||
TransitionCode transitionCode, long seqId) throws IOException;
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
|
protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
|
||||||
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
|
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
|
||||||
|
@ -254,7 +279,7 @@ public abstract class RegionRemoteProcedureBase extends Procedure<MasterProcedur
|
||||||
throw new ProcedureSuspendedException();
|
throw new ProcedureSuspendedException();
|
||||||
}
|
}
|
||||||
case REGION_REMOTE_PROCEDURE_REPORT_SUCCEED:
|
case REGION_REMOTE_PROCEDURE_REPORT_SUCCEED:
|
||||||
updateTransition(env, regionNode, transitionCode, seqId);
|
env.getAssignmentManager().persistToMeta(regionNode);
|
||||||
unattach(env);
|
unattach(env);
|
||||||
return null;
|
return null;
|
||||||
case REGION_REMOTE_PROCEDURE_DISPATCH_FAIL:
|
case REGION_REMOTE_PROCEDURE_DISPATCH_FAIL:
|
||||||
|
@ -262,7 +287,7 @@ public abstract class RegionRemoteProcedureBase extends Procedure<MasterProcedur
|
||||||
unattach(env);
|
unattach(env);
|
||||||
return null;
|
return null;
|
||||||
case REGION_REMOTE_PROCEDURE_SERVER_CRASH:
|
case REGION_REMOTE_PROCEDURE_SERVER_CRASH:
|
||||||
env.getAssignmentManager().regionClosed(regionNode, false);
|
env.getAssignmentManager().regionClosedAbnormally(regionNode);
|
||||||
unattach(env);
|
unattach(env);
|
||||||
return null;
|
return null;
|
||||||
default:
|
default:
|
||||||
|
|
|
@ -403,7 +403,7 @@ public class TransitRegionStateProcedure
|
||||||
remoteProc.serverCrashed(env, regionNode, serverName);
|
remoteProc.serverCrashed(env, regionNode, serverName);
|
||||||
} else {
|
} else {
|
||||||
// we are in RUNNING state, just update the region state, and we will process it later.
|
// we are in RUNNING state, just update the region state, and we will process it later.
|
||||||
env.getAssignmentManager().regionClosed(regionNode, false);
|
env.getAssignmentManager().regionClosedAbnormally(regionNode);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -416,6 +416,15 @@ public class TransitRegionStateProcedure
|
||||||
this.remoteProc = null;
|
this.remoteProc = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// will be called after we finish loading the meta entry for this region.
|
||||||
|
// used to change the state of the region node if we have a sub procedure, as we may not persist
|
||||||
|
// the state to meta yet. See the code in RegionRemoteProcedureBase.execute for more details.
|
||||||
|
void stateLoaded(AssignmentManager am, RegionStateNode regionNode) {
|
||||||
|
if (remoteProc != null) {
|
||||||
|
remoteProc.stateLoaded(am, regionNode);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void rollbackState(MasterProcedureEnv env, RegionStateTransitionState state)
|
protected void rollbackState(MasterProcedureEnv env, RegionStateTransitionState state)
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
|
|
|
@ -0,0 +1,218 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.master.assignment;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.PleaseHoldException;
|
||||||
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
import org.apache.hadoop.hbase.StartMiniClusterOption;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.master.HMaster;
|
||||||
|
import org.apache.hadoop.hbase.master.MasterServices;
|
||||||
|
import org.apache.hadoop.hbase.master.RegionPlan;
|
||||||
|
import org.apache.hadoop.hbase.master.ServerManager;
|
||||||
|
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.IdLock;
|
||||||
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Testcase for HBASE-22365.
|
||||||
|
*/
|
||||||
|
@Category({ MasterTests.class, MediumTests.class })
|
||||||
|
public class TestSCPGetRegionsRace {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestSCPGetRegionsRace.class);
|
||||||
|
|
||||||
|
private static final List<ServerName> EXCLUDE_SERVERS = new ArrayList<>();
|
||||||
|
|
||||||
|
private static final class ServerManagerForTest extends ServerManager {
|
||||||
|
|
||||||
|
public ServerManagerForTest(MasterServices master) {
|
||||||
|
super(master);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<ServerName> createDestinationServersList() {
|
||||||
|
return super.createDestinationServersList(EXCLUDE_SERVERS);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static CountDownLatch ARRIVE_REPORT;
|
||||||
|
|
||||||
|
private static CountDownLatch RESUME_REPORT;
|
||||||
|
|
||||||
|
private static CountDownLatch ARRIVE_GET;
|
||||||
|
|
||||||
|
private static CountDownLatch RESUME_GET;
|
||||||
|
|
||||||
|
private static final class AssignmentManagerForTest extends AssignmentManager {
|
||||||
|
|
||||||
|
public AssignmentManagerForTest(MasterServices master) {
|
||||||
|
super(master);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ReportRegionStateTransitionResponse reportRegionStateTransition(
|
||||||
|
ReportRegionStateTransitionRequest req) throws PleaseHoldException {
|
||||||
|
if (req.getTransition(0).getTransitionCode() == TransitionCode.CLOSED) {
|
||||||
|
if (ARRIVE_REPORT != null) {
|
||||||
|
ARRIVE_REPORT.countDown();
|
||||||
|
try {
|
||||||
|
RESUME_REPORT.await();
|
||||||
|
RESUME_REPORT = null;
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return super.reportRegionStateTransition(req);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<RegionInfo> getRegionsOnServer(ServerName serverName) {
|
||||||
|
List<RegionInfo> regions = super.getRegionsOnServer(serverName);
|
||||||
|
if (ARRIVE_GET != null) {
|
||||||
|
ARRIVE_GET.countDown();
|
||||||
|
try {
|
||||||
|
RESUME_GET.await();
|
||||||
|
RESUME_GET = null;
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return regions;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public static final class HMasterForTest extends HMaster {
|
||||||
|
|
||||||
|
public HMasterForTest(Configuration conf) throws IOException, KeeperException {
|
||||||
|
super(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected AssignmentManager createAssignmentManager(MasterServices master) {
|
||||||
|
return new AssignmentManagerForTest(master);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ServerManager createServerManager(MasterServices master) throws IOException {
|
||||||
|
setupClusterConnection();
|
||||||
|
return new ServerManagerForTest(master);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||||
|
|
||||||
|
private static TableName NAME = TableName.valueOf("Assign");
|
||||||
|
|
||||||
|
private static byte[] CF = Bytes.toBytes("cf");
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUp() throws Exception {
|
||||||
|
UTIL.startMiniCluster(StartMiniClusterOption.builder().masterClass(HMasterForTest.class)
|
||||||
|
.numMasters(1).numRegionServers(3).build());
|
||||||
|
UTIL.createTable(NAME, CF);
|
||||||
|
UTIL.waitTableAvailable(NAME);
|
||||||
|
UTIL.getAdmin().balancerSwitch(false, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDown() throws Exception {
|
||||||
|
UTIL.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test() throws Exception {
|
||||||
|
RegionInfo region =
|
||||||
|
Iterables.getOnlyElement(UTIL.getMiniHBaseCluster().getRegions(NAME)).getRegionInfo();
|
||||||
|
HMaster master = UTIL.getMiniHBaseCluster().getMaster();
|
||||||
|
AssignmentManager am = master.getAssignmentManager();
|
||||||
|
RegionStateNode rsn = am.getRegionStates().getRegionStateNode(region);
|
||||||
|
ServerName source = rsn.getRegionLocation();
|
||||||
|
ServerName dest =
|
||||||
|
UTIL.getAdmin().getRegionServers().stream().filter(sn -> !sn.equals(source)).findAny().get();
|
||||||
|
|
||||||
|
ARRIVE_REPORT = new CountDownLatch(1);
|
||||||
|
RESUME_REPORT = new CountDownLatch(1);
|
||||||
|
|
||||||
|
Future<?> future = am.moveAsync(new RegionPlan(region, source, dest));
|
||||||
|
|
||||||
|
ARRIVE_REPORT.await();
|
||||||
|
ARRIVE_REPORT = null;
|
||||||
|
// let's get procedure lock to stop the TRSP
|
||||||
|
IdLock procExecutionLock = master.getMasterProcedureExecutor().getProcExecutionLock();
|
||||||
|
long procId = master.getProcedures().stream()
|
||||||
|
.filter(p -> p instanceof RegionRemoteProcedureBase).findAny().get().getProcId();
|
||||||
|
IdLock.Entry lockEntry = procExecutionLock.getLockEntry(procId);
|
||||||
|
RESUME_REPORT.countDown();
|
||||||
|
|
||||||
|
// kill the source region server
|
||||||
|
ARRIVE_GET = new CountDownLatch(1);
|
||||||
|
RESUME_GET = new CountDownLatch(1);
|
||||||
|
UTIL.getMiniHBaseCluster().killRegionServer(source);
|
||||||
|
|
||||||
|
// wait until we try to get the region list of the region server
|
||||||
|
ARRIVE_GET.await();
|
||||||
|
ARRIVE_GET = null;
|
||||||
|
// release the procedure lock and let the TRSP to finish
|
||||||
|
procExecutionLock.releaseLockEntry(lockEntry);
|
||||||
|
future.get();
|
||||||
|
|
||||||
|
// resume the SCP
|
||||||
|
EXCLUDE_SERVERS.add(dest);
|
||||||
|
RESUME_GET.countDown();
|
||||||
|
// wait until there are no SCPs and TRSPs
|
||||||
|
UTIL.waitFor(60000, () -> master.getProcedures().stream().allMatch(p -> p.isFinished() ||
|
||||||
|
(!(p instanceof ServerCrashProcedure) && !(p instanceof TransitRegionStateProcedure))));
|
||||||
|
|
||||||
|
// assert the region is only on the dest server.
|
||||||
|
HRegionServer rs = UTIL.getMiniHBaseCluster().getRegionServer(dest);
|
||||||
|
assertNotNull(rs.getRegion(region.getEncodedName()));
|
||||||
|
assertNull(UTIL.getOtherRegionServer(rs).getRegion(region.getEncodedName()));
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue