HBASE-21017 Revisit the expected states for open/close
This commit is contained in:
parent
9b7c530b6c
commit
fc21dc854b
|
@ -552,7 +552,7 @@ public class AssignmentManager implements ServerListener {
|
|||
TransitRegionStateProcedure proc;
|
||||
regionNode.lock();
|
||||
try {
|
||||
preTransitCheck(regionNode, RegionStates.STATES_EXPECTED_ON_OPEN);
|
||||
preTransitCheck(regionNode, STATES_EXPECTED_ON_ASSIGN);
|
||||
proc = TransitRegionStateProcedure.assign(getProcedureEnvironment(), regionInfo, sn);
|
||||
regionNode.setProcedure(proc);
|
||||
} finally {
|
||||
|
@ -573,7 +573,7 @@ public class AssignmentManager implements ServerListener {
|
|||
TransitRegionStateProcedure proc;
|
||||
regionNode.lock();
|
||||
try {
|
||||
preTransitCheck(regionNode, RegionStates.STATES_EXPECTED_ON_CLOSE);
|
||||
preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE);
|
||||
proc = TransitRegionStateProcedure.unassign(getProcedureEnvironment(), regionInfo);
|
||||
regionNode.setProcedure(proc);
|
||||
} finally {
|
||||
|
@ -591,7 +591,7 @@ public class AssignmentManager implements ServerListener {
|
|||
TransitRegionStateProcedure proc;
|
||||
regionNode.lock();
|
||||
try {
|
||||
preTransitCheck(regionNode, RegionStates.STATES_EXPECTED_ON_CLOSE);
|
||||
preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE);
|
||||
regionNode.checkOnline();
|
||||
proc = TransitRegionStateProcedure.move(getProcedureEnvironment(), regionInfo, targetServer);
|
||||
regionNode.setProcedure(proc);
|
||||
|
@ -1410,6 +1410,35 @@ public class AssignmentManager implements ServerListener {
|
|||
return regionState != null ? regionState.getRegionInfo() : null;
|
||||
}
|
||||
|
||||
// ============================================================================================
|
||||
// Expected states on region state transition.
|
||||
// Notice that there is expected states for transiting to OPENING state, this is because SCP.
|
||||
// See the comments in regionOpening method for more details.
|
||||
// ============================================================================================
|
||||
private static final State[] STATES_EXPECTED_ON_OPEN = {
|
||||
State.OPENING, // Normal case
|
||||
State.OPEN // Retrying
|
||||
};
|
||||
|
||||
private static final State[] STATES_EXPECTED_ON_CLOSING = {
|
||||
State.OPEN, // Normal case
|
||||
State.CLOSING, // Retrying
|
||||
State.SPLITTING, // Offline the split parent
|
||||
State.MERGING // Offline the merge parents
|
||||
};
|
||||
|
||||
private static final State[] STATES_EXPECTED_ON_CLOSED = {
|
||||
State.CLOSING, // Normal case
|
||||
State.CLOSED // Retrying
|
||||
};
|
||||
|
||||
// This is for manually scheduled region assign, can add other states later if we find out other
|
||||
// usages
|
||||
private static final State[] STATES_EXPECTED_ON_ASSIGN = { State.CLOSED, State.OFFLINE };
|
||||
|
||||
// We only allow unassign or move a region which is in OPEN state.
|
||||
private static final State[] STATES_EXPECTED_ON_UNASSIGN_OR_MOVE = { State.OPEN };
|
||||
|
||||
// ============================================================================================
|
||||
// Region Status update
|
||||
// Should only be called in TransitRegionStateProcedure
|
||||
|
@ -1432,7 +1461,10 @@ public class AssignmentManager implements ServerListener {
|
|||
|
||||
// should be called within the synchronized block of RegionStateNode
|
||||
void regionOpening(RegionStateNode regionNode) throws IOException {
|
||||
transitStateAndUpdate(regionNode, State.OPENING, RegionStates.STATES_EXPECTED_ON_OPEN);
|
||||
// As in SCP, for performance reason, there is no TRSP attached with this region, we will not
|
||||
// update the region state, which means that the region could be in any state when we want to
|
||||
// assign it after a RS crash. So here we do not pass the expectedStates parameter.
|
||||
transitStateAndUpdate(regionNode, State.OPENING);
|
||||
regionStates.addRegionToServer(regionNode);
|
||||
// update the operation count metrics
|
||||
metrics.incrementOperationCounter();
|
||||
|
@ -1468,7 +1500,7 @@ public class AssignmentManager implements ServerListener {
|
|||
void regionOpened(RegionStateNode regionNode) throws IOException {
|
||||
// TODO: OPENING Updates hbase:meta too... we need to do both here and there?
|
||||
// That is a lot of hbase:meta writing.
|
||||
transitStateAndUpdate(regionNode, State.OPEN, RegionStates.STATES_EXPECTED_ON_OPEN);
|
||||
transitStateAndUpdate(regionNode, State.OPEN, STATES_EXPECTED_ON_OPEN);
|
||||
RegionInfo hri = regionNode.getRegionInfo();
|
||||
if (isMetaRegion(hri)) {
|
||||
// Usually we'd set a table ENABLED at this stage but hbase:meta is ALWAYs enabled, it
|
||||
|
@ -1483,8 +1515,7 @@ public class AssignmentManager implements ServerListener {
|
|||
|
||||
// should be called within the synchronized block of RegionStateNode
|
||||
void regionClosing(RegionStateNode regionNode) throws IOException {
|
||||
transitStateAndUpdate(regionNode, State.CLOSING, RegionStates.STATES_EXPECTED_ON_CLOSE);
|
||||
regionStateStore.updateRegionLocation(regionNode);
|
||||
transitStateAndUpdate(regionNode, State.CLOSING, STATES_EXPECTED_ON_CLOSING);
|
||||
|
||||
RegionInfo hri = regionNode.getRegionInfo();
|
||||
// Set meta has not initialized early. so people trying to create/edit tables will wait
|
||||
|
@ -1502,8 +1533,13 @@ public class AssignmentManager implements ServerListener {
|
|||
void regionClosed(RegionStateNode regionNode, boolean normally) throws IOException {
|
||||
RegionState.State state = regionNode.getState();
|
||||
ServerName regionLocation = regionNode.getRegionLocation();
|
||||
regionNode.transitionState(normally ? State.CLOSED : State.ABNORMALLY_CLOSED,
|
||||
RegionStates.STATES_EXPECTED_ON_CLOSE);
|
||||
if (normally) {
|
||||
regionNode.transitionState(State.CLOSED, STATES_EXPECTED_ON_CLOSED);
|
||||
} else {
|
||||
// For SCP
|
||||
regionNode.transitionState(State.ABNORMALLY_CLOSED);
|
||||
}
|
||||
regionNode.setRegionLocation(null);
|
||||
boolean succ = false;
|
||||
try {
|
||||
regionStateStore.updateRegionLocation(regionNode);
|
||||
|
@ -1517,7 +1553,6 @@ public class AssignmentManager implements ServerListener {
|
|||
}
|
||||
if (regionLocation != null) {
|
||||
regionNode.setLastHost(regionLocation);
|
||||
regionNode.setRegionLocation(null);
|
||||
regionStates.removeRegionFromServer(regionLocation, regionNode);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.master.assignment;
|
|||
import java.io.IOException;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.master.RegionState;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.RegionCloseOperation;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
|
||||
|
@ -79,4 +80,9 @@ public class CloseRegionProcedure extends RegionRemoteProcedureBase {
|
|||
assignCandidate = ProtobufUtil.toServerName(data.getAssignCandidate());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean shouldDispatch(RegionStateNode regionNode) {
|
||||
return !regionNode.isInState(RegionState.State.CLOSED, RegionState.State.ABNORMALLY_CLOSED);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.master.assignment;
|
|||
import java.io.IOException;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.master.RegionState;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.RegionOpenOperation;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
|
||||
|
@ -64,4 +65,9 @@ public class OpenRegionProcedure extends RegionRemoteProcedureBase {
|
|||
super.deserializeStateData(serializer);
|
||||
serializer.deserialize(OpenRegionProcedureStateData.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean shouldDispatch(RegionStateNode regionNode) {
|
||||
return !regionNode.isInState(RegionState.State.OPEN);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -77,16 +77,16 @@ public abstract class RegionRemoteProcedureBase extends Procedure<MasterProcedur
|
|||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
private ProcedureEvent<?> getRegionEvent(MasterProcedureEnv env) {
|
||||
return env.getAssignmentManager().getRegionStates().getOrCreateRegionStateNode(region)
|
||||
.getProcedureEvent();
|
||||
private RegionStateNode getRegionNode(MasterProcedureEnv env) {
|
||||
return env.getAssignmentManager().getRegionStates().getRegionStateNode(region);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remoteCallFailed(MasterProcedureEnv env, ServerName remote,
|
||||
IOException exception) {
|
||||
ProcedureEvent<?> event = getRegionEvent(env);
|
||||
synchronized (event) {
|
||||
public void remoteCallFailed(MasterProcedureEnv env, ServerName remote, IOException exception) {
|
||||
RegionStateNode regionNode = getRegionNode(env);
|
||||
regionNode.lock();
|
||||
try {
|
||||
ProcedureEvent<?> event = regionNode.getProcedureEvent();
|
||||
if (event.isReady()) {
|
||||
LOG.warn(
|
||||
"The procedure event of procedure {} for region {} to server {} is not suspended, " +
|
||||
|
@ -97,6 +97,8 @@ public abstract class RegionRemoteProcedureBase extends Procedure<MasterProcedur
|
|||
LOG.warn("The remote operation {} for region {} to server {} failed", this, region,
|
||||
targetServer, exception);
|
||||
event.wake(env.getProcedureScheduler());
|
||||
} finally {
|
||||
regionNode.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -115,6 +117,17 @@ public abstract class RegionRemoteProcedureBase extends Procedure<MasterProcedur
|
|||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check whether we still need to make the call to RS.
|
||||
* <p/>
|
||||
* Usually this will not happen if we do not allow assigning a already onlined region. But if we
|
||||
* have something wrong in the RSProcedureDispatcher, where we have already sent the request to
|
||||
* RS, but then we tell the upper layer the remote call is failed due to rpc timeout or connection
|
||||
* closed or anything else, then this issue can still happen. So here we add a check to make it
|
||||
* more robust.
|
||||
*/
|
||||
protected abstract boolean shouldDispatch(RegionStateNode regionNode);
|
||||
|
||||
@Override
|
||||
protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
|
||||
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
|
||||
|
@ -122,8 +135,15 @@ public abstract class RegionRemoteProcedureBase extends Procedure<MasterProcedur
|
|||
// we are done, the parent procedure will check whether we are succeeded.
|
||||
return null;
|
||||
}
|
||||
ProcedureEvent<?> event = getRegionEvent(env);
|
||||
synchronized (event) {
|
||||
RegionStateNode regionNode = getRegionNode(env);
|
||||
regionNode.lock();
|
||||
try {
|
||||
if (!shouldDispatch(regionNode)) {
|
||||
return null;
|
||||
}
|
||||
// The code which wakes us up also needs to lock the RSN so here we do not need to synchronize
|
||||
// on the event.
|
||||
ProcedureEvent<?> event = regionNode.getProcedureEvent();
|
||||
try {
|
||||
env.getRemoteDispatcher().addOperationToNode(targetServer, this);
|
||||
} catch (FailedRemoteDispatchException e) {
|
||||
|
@ -136,6 +156,8 @@ public abstract class RegionRemoteProcedureBase extends Procedure<MasterProcedur
|
|||
event.suspend();
|
||||
event.suspendIfNotReady(this);
|
||||
throw new ProcedureSuspendedException();
|
||||
} finally {
|
||||
regionNode.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -55,22 +55,6 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
|
|||
public class RegionStates {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(RegionStates.class);
|
||||
|
||||
// TODO: need to be more specific, i.e, OPENING vs. OPEN, CLOSING vs. CLOSED.
|
||||
static final State[] STATES_EXPECTED_ON_OPEN = new State[] {
|
||||
State.OPEN, // State may already be OPEN if we died after receiving the OPEN from regionserver
|
||||
// but before complete finish of AssignProcedure. HBASE-20100.
|
||||
State.OFFLINE, State.CLOSED, State.ABNORMALLY_CLOSED, // disable/offline
|
||||
State.SPLITTING, // ServerCrashProcedure
|
||||
State.OPENING, State.FAILED_OPEN, // already in-progress (retrying)
|
||||
State.MERGED, State.SPLITTING_NEW
|
||||
};
|
||||
|
||||
static final State[] STATES_EXPECTED_ON_CLOSE = new State[] {
|
||||
State.SPLITTING, State.MERGING, State.OPENING, // ServerCrashProcedure
|
||||
State.OPEN, // enabled/open
|
||||
State.CLOSING // already in-progress (retrying)
|
||||
};
|
||||
|
||||
// This comparator sorts the RegionStates by time stamp then Region name.
|
||||
// Comparing by timestamp alone can lead us to discard different RegionStates that happen
|
||||
// to share a timestamp.
|
||||
|
|
|
@ -18,33 +18,26 @@
|
|||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.MetaTableAccessor;
|
||||
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
|
||||
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.coprocessor.MasterObserver;
|
||||
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.ClassRule;
|
||||
|
@ -55,10 +48,6 @@ import org.junit.rules.TestName;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Predicate;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||
|
||||
@Category({ MasterTests.class, MediumTests.class })
|
||||
public class TestEnableTable {
|
||||
|
||||
|
@ -85,63 +74,6 @@ public class TestEnableTable {
|
|||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEnableTableWithNoRegionServers() throws Exception {
|
||||
final TableName tableName = TableName.valueOf(name.getMethodName());
|
||||
final MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
|
||||
final HMaster m = cluster.getMaster();
|
||||
final Admin admin = TEST_UTIL.getAdmin();
|
||||
final HTableDescriptor desc = new HTableDescriptor(tableName);
|
||||
desc.addFamily(new HColumnDescriptor(FAMILYNAME));
|
||||
admin.createTable(desc);
|
||||
admin.disableTable(tableName);
|
||||
TEST_UTIL.waitTableDisabled(tableName.getName());
|
||||
|
||||
admin.enableTable(tableName);
|
||||
TEST_UTIL.waitTableEnabled(tableName);
|
||||
// disable once more
|
||||
admin.disableTable(tableName);
|
||||
|
||||
TEST_UTIL.waitUntilNoRegionsInTransition(60000);
|
||||
// now stop region servers
|
||||
JVMClusterUtil.RegionServerThread rs = cluster.getRegionServerThreads().get(0);
|
||||
rs.getRegionServer().stop("stop");
|
||||
cluster.waitForRegionServerToStop(rs.getRegionServer().getServerName(), 10000);
|
||||
|
||||
// We used to enable the table here but AMv2 would hang waiting on a RS to check-in.
|
||||
// Revisit.
|
||||
|
||||
JVMClusterUtil.RegionServerThread rs2 = cluster.startRegionServer();
|
||||
cluster.waitForRegionServerToStart(rs2.getRegionServer().getServerName().getHostname(),
|
||||
rs2.getRegionServer().getServerName().getPort(), 60000);
|
||||
|
||||
LOG.debug("Now enabling table " + tableName);
|
||||
admin.enableTable(tableName);
|
||||
assertTrue(admin.isTableEnabled(tableName));
|
||||
|
||||
List<HRegionInfo> regions = TEST_UTIL.getAdmin().getTableRegions(tableName);
|
||||
assertEquals(1, regions.size());
|
||||
for (HRegionInfo region : regions) {
|
||||
TEST_UTIL.getAdmin().assign(region.getEncodedNameAsBytes());
|
||||
}
|
||||
LOG.debug("Waiting for table assigned " + tableName);
|
||||
TEST_UTIL.waitUntilAllRegionsAssigned(tableName);
|
||||
List<HRegionInfo> onlineRegions = admin.getOnlineRegions(
|
||||
rs2.getRegionServer().getServerName());
|
||||
ArrayList<HRegionInfo> tableRegions = filterTableRegions(tableName, onlineRegions);
|
||||
assertEquals(1, tableRegions.size());
|
||||
}
|
||||
|
||||
private ArrayList<HRegionInfo> filterTableRegions(final TableName tableName,
|
||||
List<HRegionInfo> onlineRegions) {
|
||||
return Lists.newArrayList(Iterables.filter(onlineRegions, new Predicate<HRegionInfo>() {
|
||||
@Override
|
||||
public boolean apply(HRegionInfo input) {
|
||||
return input.getTable().equals(tableName);
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
/**
|
||||
* We were only clearing rows that had a hregioninfo column in hbase:meta. Mangled rows that
|
||||
* were missing the hregioninfo because of error were being left behind messing up any
|
||||
|
|
Loading…
Reference in New Issue