HBASE-21463 The checkOnlineRegionsReport can accidentally complete a TRSP
This commit is contained in:
parent
61f1d9735b
commit
ea3b2dfaeb
|
@ -1916,6 +1916,11 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
return completed.size();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public IdLock getProcExecutionLock() {
|
||||
return procExecutionLock;
|
||||
}
|
||||
|
||||
// ==========================================================================
|
||||
// Worker Thread
|
||||
// ==========================================================================
|
||||
|
|
|
@ -161,7 +161,8 @@ enum EnableTableState {
|
|||
message EnableTableStateData {
|
||||
required UserInformation user_info = 1;
|
||||
required TableName table_name = 2;
|
||||
required bool skip_table_state_check = 3;
|
||||
// not used any more, always false
|
||||
required bool skip_table_state_check = 3[deprecated=true];
|
||||
}
|
||||
|
||||
enum DisableTableState {
|
||||
|
|
|
@ -2556,7 +2556,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
// Note: if the procedure throws exception, we will catch it and rethrow.
|
||||
final ProcedurePrepareLatch prepareLatch = ProcedurePrepareLatch.createLatch();
|
||||
submitProcedure(new EnableTableProcedure(procedureExecutor.getEnvironment(),
|
||||
tableName, false, prepareLatch));
|
||||
tableName, prepareLatch));
|
||||
prepareLatch.await();
|
||||
|
||||
getMaster().getMasterCoprocessorHost().postEnableTable(tableName);
|
||||
|
|
|
@ -39,7 +39,6 @@ import org.apache.hadoop.hbase.PleaseHoldException;
|
|||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.UnknownRegionException;
|
||||
import org.apache.hadoop.hbase.YouAreDeadException;
|
||||
import org.apache.hadoop.hbase.client.DoNotRetryRegionException;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||
|
@ -162,7 +161,8 @@ public class AssignmentManager implements ServerListener {
|
|||
this(master, new RegionStateStore(master));
|
||||
}
|
||||
|
||||
public AssignmentManager(final MasterServices master, final RegionStateStore stateStore) {
|
||||
@VisibleForTesting
|
||||
AssignmentManager(final MasterServices master, final RegionStateStore stateStore) {
|
||||
this.master = master;
|
||||
this.regionStateStore = stateStore;
|
||||
this.metrics = new MetricsAssignmentManager();
|
||||
|
@ -979,23 +979,26 @@ public class AssignmentManager implements ServerListener {
|
|||
// RS Status update (report online regions) helpers
|
||||
// ============================================================================================
|
||||
/**
|
||||
* the master will call this method when the RS send the regionServerReport().
|
||||
* the report will contains the "online regions".
|
||||
* this method will check the the online regions against the in-memory state of the AM,
|
||||
* if there is a mismatch we will try to fence out the RS with the assumption
|
||||
* that something went wrong on the RS side.
|
||||
* The master will call this method when the RS send the regionServerReport(). The report will
|
||||
* contains the "online regions". This method will check the the online regions against the
|
||||
* in-memory state of the AM, and we will log a warn message if there is a mismatch. This is
|
||||
* because that there is no fencing between the reportRegionStateTransition method and
|
||||
* regionServerReport method, so there could be race and introduce inconsistency here, but
|
||||
* actually there is no problem.
|
||||
* <p/>
|
||||
* Please see HBASE-21421 and HBASE-21463 for more details.
|
||||
*/
|
||||
public void reportOnlineRegions(final ServerName serverName, final Set<byte[]> regionNames)
|
||||
throws YouAreDeadException {
|
||||
if (!isRunning()) return;
|
||||
public void reportOnlineRegions(ServerName serverName, Set<byte[]> regionNames) {
|
||||
if (!isRunning()) {
|
||||
return;
|
||||
}
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("ReportOnlineRegions " + serverName + " regionCount=" + regionNames.size() +
|
||||
", metaLoaded=" + isMetaLoaded() + " " +
|
||||
regionNames.stream().map(element -> Bytes.toStringBinary(element)).
|
||||
collect(Collectors.toList()));
|
||||
LOG.trace("ReportOnlineRegions {} regionCount={}, metaLoaded={} {}", serverName,
|
||||
regionNames.size(), isMetaLoaded(),
|
||||
regionNames.stream().map(Bytes::toStringBinary).collect(Collectors.toList()));
|
||||
}
|
||||
|
||||
final ServerStateNode serverNode = regionStates.getOrCreateServer(serverName);
|
||||
ServerStateNode serverNode = regionStates.getOrCreateServer(serverName);
|
||||
|
||||
synchronized (serverNode) {
|
||||
if (!serverNode.isInState(ServerState.ONLINE)) {
|
||||
|
@ -1003,103 +1006,58 @@ public class AssignmentManager implements ServerListener {
|
|||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (regionNames.isEmpty()) {
|
||||
// nothing to do if we don't have regions
|
||||
LOG.trace("no online region found on " + serverName);
|
||||
} else if (!isMetaLoaded()) {
|
||||
// if we are still on startup, discard the report unless is from someone holding meta
|
||||
checkOnlineRegionsReportForMeta(serverNode, regionNames);
|
||||
} else {
|
||||
// The Heartbeat updates us of what regions are only. check and verify the state.
|
||||
checkOnlineRegionsReport(serverNode, regionNames);
|
||||
LOG.trace("no online region found on {}", serverName);
|
||||
return;
|
||||
}
|
||||
if (!isMetaLoaded()) {
|
||||
// we are still on startup, skip checking
|
||||
return;
|
||||
}
|
||||
// The Heartbeat tells us of what regions are on the region serve, check the state.
|
||||
checkOnlineRegionsReport(serverNode, regionNames);
|
||||
|
||||
// wake report event
|
||||
wakeServerReportEvent(serverNode);
|
||||
}
|
||||
|
||||
void checkOnlineRegionsReportForMeta(ServerStateNode serverNode, Set<byte[]> regionNames) {
|
||||
try {
|
||||
for (byte[] regionName : regionNames) {
|
||||
final RegionInfo hri = getMetaRegionFromName(regionName);
|
||||
if (hri == null) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Skip online report for region=" + Bytes.toStringBinary(regionName) +
|
||||
" while meta is loading");
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(hri);
|
||||
LOG.info("META REPORTED: " + regionNode);
|
||||
regionNode.lock();
|
||||
try {
|
||||
if (!reportTransition(regionNode, serverNode, TransitionCode.OPENED, 0)) {
|
||||
LOG.warn("META REPORTED but no procedure found (complete?); set location=" +
|
||||
serverNode.getServerName());
|
||||
regionNode.setRegionLocation(serverNode.getServerName());
|
||||
} else if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("META REPORTED: " + regionNode);
|
||||
}
|
||||
} finally {
|
||||
regionNode.unlock();
|
||||
}
|
||||
// just check and output possible inconsistency, without actually doing anything
|
||||
private void checkOnlineRegionsReport(ServerStateNode serverNode, Set<byte[]> regionNames) {
|
||||
ServerName serverName = serverNode.getServerName();
|
||||
for (byte[] regionName : regionNames) {
|
||||
if (!isRunning()) {
|
||||
return;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
ServerName serverName = serverNode.getServerName();
|
||||
LOG.warn("KILLING " + serverName + ": " + e.getMessage());
|
||||
killRegionServer(serverNode);
|
||||
}
|
||||
}
|
||||
|
||||
void checkOnlineRegionsReport(final ServerStateNode serverNode, final Set<byte[]> regionNames) {
|
||||
final ServerName serverName = serverNode.getServerName();
|
||||
try {
|
||||
for (byte[] regionName: regionNames) {
|
||||
if (!isRunning()) {
|
||||
return;
|
||||
}
|
||||
final RegionStateNode regionNode = regionStates.getRegionStateNodeFromName(regionName);
|
||||
if (regionNode == null) {
|
||||
throw new UnexpectedStateException("Not online: " + Bytes.toStringBinary(regionName));
|
||||
}
|
||||
regionNode.lock();
|
||||
try {
|
||||
if (regionNode.isInState(State.OPENING, State.OPEN)) {
|
||||
if (!regionNode.getRegionLocation().equals(serverName)) {
|
||||
throw new UnexpectedStateException(regionNode.toString() +
|
||||
" reported OPEN on server=" + serverName +
|
||||
" but state has otherwise.");
|
||||
} else if (regionNode.isInState(State.OPENING)) {
|
||||
try {
|
||||
if (!reportTransition(regionNode, serverNode, TransitionCode.OPENED, 0)) {
|
||||
LOG.warn(regionNode.toString() + " reported OPEN on server=" + serverName +
|
||||
" but state has otherwise AND NO procedure is running");
|
||||
}
|
||||
} catch (UnexpectedStateException e) {
|
||||
LOG.warn(regionNode.toString() + " reported unexpteced OPEN: " + e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
} else if (!regionNode.isInState(State.CLOSING, State.SPLITTING)) {
|
||||
long diff = regionNode.getLastUpdate() - EnvironmentEdgeManager.currentTime();
|
||||
if (diff > 1000/*One Second... make configurable if an issue*/) {
|
||||
// So, we can get report that a region is CLOSED or SPLIT because a heartbeat
|
||||
// came in at about same time as a region transition. Make sure there is some
|
||||
// elapsed time between killing remote server.
|
||||
throw new UnexpectedStateException(regionNode.toString() +
|
||||
" reported an unexpected OPEN; time since last update=" + diff);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
regionNode.unlock();
|
||||
}
|
||||
RegionStateNode regionNode = regionStates.getRegionStateNodeFromName(regionName);
|
||||
if (regionNode == null) {
|
||||
LOG.warn("No region state node for {}, it should already be on {}",
|
||||
Bytes.toStringBinary(regionName), serverName);
|
||||
continue;
|
||||
}
|
||||
regionNode.lock();
|
||||
try {
|
||||
long diff = EnvironmentEdgeManager.currentTime() - regionNode.getLastUpdate();
|
||||
if (regionNode.isInState(State.OPENING, State.OPEN)) {
|
||||
// This is possible as a region server has just closed a region but the region server
|
||||
// report is generated before the closing, but arrive after the closing. Make sure there
|
||||
// is some elapsed time so less false alarms.
|
||||
if (!regionNode.getRegionLocation().equals(serverName) && diff > 1000) {
|
||||
LOG.warn("{} reported OPEN on server={} but state has otherwise", regionNode,
|
||||
serverName);
|
||||
}
|
||||
} else if (!regionNode.isInState(State.CLOSING, State.SPLITTING)) {
|
||||
// So, we can get report that a region is CLOSED or SPLIT because a heartbeat
|
||||
// came in at about same time as a region transition. Make sure there is some
|
||||
// elapsed time so less false alarms.
|
||||
if (diff > 1000) {
|
||||
LOG.warn("{} reported an unexpected OPEN on {}; time since last update={}ms",
|
||||
regionNode, serverName, diff);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
regionNode.unlock();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
//See HBASE-21421, we can count on reportRegionStateTransition calls
|
||||
//We only log a warming here. It could be a network lag.
|
||||
LOG.warn("Failed to checkOnlineRegionsReport, maybe due to network lag, "
|
||||
+ "if this message continues, be careful of double assign", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1905,10 +1863,6 @@ public class AssignmentManager implements ServerListener {
|
|||
wakeServerReportEvent(serverNode);
|
||||
}
|
||||
|
||||
private void killRegionServer(final ServerStateNode serverNode) {
|
||||
master.getServerManager().expireServer(serverNode.getServerName());
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
MasterServices getMaster() {
|
||||
return master;
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/*
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -15,7 +15,6 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.master.procedure;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -52,36 +51,30 @@ public class EnableTableProcedure
|
|||
private static final Logger LOG = LoggerFactory.getLogger(EnableTableProcedure.class);
|
||||
|
||||
private TableName tableName;
|
||||
private boolean skipTableStateCheck;
|
||||
|
||||
private Boolean traceEnabled = null;
|
||||
|
||||
public EnableTableProcedure() {
|
||||
super();
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
* @param env MasterProcedureEnv
|
||||
* @param tableName the table to operate on
|
||||
* @param skipTableStateCheck whether to check table state
|
||||
*/
|
||||
public EnableTableProcedure(final MasterProcedureEnv env, final TableName tableName,
|
||||
final boolean skipTableStateCheck) {
|
||||
this(env, tableName, skipTableStateCheck, null);
|
||||
public EnableTableProcedure(MasterProcedureEnv env, TableName tableName) {
|
||||
this(env, tableName, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
* @param env MasterProcedureEnv
|
||||
* @param tableName the table to operate on
|
||||
* @param skipTableStateCheck whether to check table state
|
||||
*/
|
||||
public EnableTableProcedure(final MasterProcedureEnv env, final TableName tableName,
|
||||
final boolean skipTableStateCheck, final ProcedurePrepareLatch syncLatch) {
|
||||
public EnableTableProcedure(MasterProcedureEnv env, TableName tableName,
|
||||
ProcedurePrepareLatch syncLatch) {
|
||||
super(env, syncLatch);
|
||||
this.tableName = tableName;
|
||||
this.skipTableStateCheck = skipTableStateCheck;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -268,29 +261,27 @@ public class EnableTableProcedure
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void serializeStateData(ProcedureStateSerializer serializer)
|
||||
throws IOException {
|
||||
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
||||
super.serializeStateData(serializer);
|
||||
|
||||
// the skipTableStateCheck is false so we still need to set it...
|
||||
@SuppressWarnings("deprecation")
|
||||
MasterProcedureProtos.EnableTableStateData.Builder enableTableMsg =
|
||||
MasterProcedureProtos.EnableTableStateData.newBuilder()
|
||||
.setUserInfo(MasterProcedureUtil.toProtoUserInfo(getUser()))
|
||||
.setTableName(ProtobufUtil.toProtoTableName(tableName))
|
||||
.setSkipTableStateCheck(skipTableStateCheck);
|
||||
MasterProcedureProtos.EnableTableStateData.newBuilder()
|
||||
.setUserInfo(MasterProcedureUtil.toProtoUserInfo(getUser()))
|
||||
.setTableName(ProtobufUtil.toProtoTableName(tableName)).setSkipTableStateCheck(false);
|
||||
|
||||
serializer.serialize(enableTableMsg.build());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void deserializeStateData(ProcedureStateSerializer serializer)
|
||||
throws IOException {
|
||||
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
||||
super.deserializeStateData(serializer);
|
||||
|
||||
MasterProcedureProtos.EnableTableStateData enableTableMsg =
|
||||
serializer.deserialize(MasterProcedureProtos.EnableTableStateData.class);
|
||||
serializer.deserialize(MasterProcedureProtos.EnableTableStateData.class);
|
||||
setUser(MasterProcedureUtil.toUserInfo(enableTableMsg.getUserInfo()));
|
||||
tableName = ProtobufUtil.toTableName(enableTableMsg.getTableName());
|
||||
skipTableStateCheck = enableTableMsg.getSkipTableStateCheck();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -318,7 +309,7 @@ public class EnableTableProcedure
|
|||
if (!MetaTableAccessor.tableExists(env.getMasterServices().getConnection(), tableName)) {
|
||||
setFailure("master-enable-table", new TableNotFoundException(tableName));
|
||||
canTableBeEnabled = false;
|
||||
} else if (!skipTableStateCheck) {
|
||||
} else {
|
||||
// There could be multiple client requests trying to disable or enable
|
||||
// the table at the same time. Ensure only the first request is honored
|
||||
// After that, no other requests can be accepted until the table reaches
|
||||
|
|
|
@ -33,7 +33,6 @@ import org.apache.hadoop.hbase.ServerMetricsBuilder;
|
|||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableDescriptors;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.YouAreDeadException;
|
||||
import org.apache.hadoop.hbase.client.ClusterConnection;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
|
||||
|
@ -78,7 +77,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActi
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException;
|
||||
|
||||
|
||||
/**
|
||||
* A mocked master services.
|
||||
* Tries to fake it. May not always work.
|
||||
|
@ -126,13 +124,9 @@ public class MockMasterServices extends MockNoopMasterServices {
|
|||
@Override
|
||||
protected boolean waitServerReportEvent(ServerName serverName, Procedure proc) {
|
||||
// Make a report with current state of the server 'serverName' before we call wait..
|
||||
SortedSet<byte []> regions = regionsToRegionServers.get(serverName);
|
||||
try {
|
||||
getAssignmentManager().reportOnlineRegions(serverName,
|
||||
regions == null? new HashSet<byte []>(): regions);
|
||||
} catch (YouAreDeadException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
SortedSet<byte[]> regions = regionsToRegionServers.get(serverName);
|
||||
getAssignmentManager().reportOnlineRegions(serverName,
|
||||
regions == null ? new HashSet<byte[]>() : regions);
|
||||
return super.waitServerReportEvent(serverName, proc);
|
||||
}
|
||||
};
|
||||
|
|
|
@ -0,0 +1,188 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.assignment;
|
||||
|
||||
import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_OPENED_VALUE;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.Future;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.PleaseHoldException;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.hadoop.hbase.master.RegionPlan;
|
||||
import org.apache.hadoop.hbase.master.RegionState;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.IdLock;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
|
||||
|
||||
@Category({ MasterTests.class, MediumTests.class })
|
||||
public class TestReportOnlineRegionsRace {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestReportOnlineRegionsRace.class);
|
||||
|
||||
private static volatile CountDownLatch ARRIVE_RS_REPORT;
|
||||
private static volatile CountDownLatch RESUME_RS_REPORT;
|
||||
private static volatile CountDownLatch FINISH_RS_REPORT;
|
||||
|
||||
private static volatile CountDownLatch RESUME_REPORT_STATE;
|
||||
|
||||
private static final class AssignmentManagerForTest extends AssignmentManager {
|
||||
|
||||
public AssignmentManagerForTest(MasterServices master) {
|
||||
super(master);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reportOnlineRegions(ServerName serverName, Set<byte[]> regionNames) {
|
||||
if (ARRIVE_RS_REPORT != null) {
|
||||
ARRIVE_RS_REPORT.countDown();
|
||||
try {
|
||||
RESUME_RS_REPORT.await();
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
super.reportOnlineRegions(serverName, regionNames);
|
||||
if (FINISH_RS_REPORT != null) {
|
||||
FINISH_RS_REPORT.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReportRegionStateTransitionResponse reportRegionStateTransition(
|
||||
ReportRegionStateTransitionRequest req) throws PleaseHoldException {
|
||||
if (RESUME_REPORT_STATE != null) {
|
||||
try {
|
||||
RESUME_REPORT_STATE.await();
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
return super.reportRegionStateTransition(req);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static final class HMasterForTest extends HMaster {
|
||||
|
||||
public HMasterForTest(Configuration conf) throws IOException, KeeperException {
|
||||
super(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AssignmentManager createAssignmentManager(MasterServices master) {
|
||||
return new AssignmentManagerForTest(master);
|
||||
}
|
||||
}
|
||||
|
||||
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
|
||||
private static TableName NAME = TableName.valueOf("Race");
|
||||
|
||||
private static byte[] CF = Bytes.toBytes("cf");
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() throws Exception {
|
||||
UTIL.getConfiguration().setClass(HConstants.MASTER_IMPL, HMasterForTest.class, HMaster.class);
|
||||
UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 1000);
|
||||
UTIL.startMiniCluster(1);
|
||||
UTIL.createTable(NAME, CF);
|
||||
UTIL.waitTableAvailable(NAME);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws Exception {
|
||||
UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRace() throws Exception {
|
||||
RegionInfo region = UTIL.getMiniHBaseCluster().getRegions(NAME).get(0).getRegionInfo();
|
||||
ProcedureExecutor<MasterProcedureEnv> procExec =
|
||||
UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
|
||||
AssignmentManager am = UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager();
|
||||
RegionStateNode rsn = am.getRegionStates().getRegionStateNode(region);
|
||||
|
||||
// halt a regionServerReport
|
||||
RESUME_RS_REPORT = new CountDownLatch(1);
|
||||
ARRIVE_RS_REPORT = new CountDownLatch(1);
|
||||
FINISH_RS_REPORT = new CountDownLatch(1);
|
||||
|
||||
ARRIVE_RS_REPORT.await();
|
||||
|
||||
// schedule a TRSP to REOPEN the region
|
||||
RESUME_REPORT_STATE = new CountDownLatch(1);
|
||||
Future<byte[]> future =
|
||||
am.moveAsync(new RegionPlan(region, rsn.getRegionLocation(), rsn.getRegionLocation()));
|
||||
TransitRegionStateProcedure proc =
|
||||
procExec.getProcedures().stream().filter(p -> p instanceof TransitRegionStateProcedure)
|
||||
.filter(p -> !p.isFinished()).map(p -> (TransitRegionStateProcedure) p).findAny().get();
|
||||
IdLock procExecLock = procExec.getProcExecutionLock();
|
||||
// a CloseRegionProcedure and then the OpenRegionProcedure we want to block
|
||||
IdLock.Entry lockEntry = procExecLock.getLockEntry(proc.getProcId() + 2);
|
||||
// resume the reportRegionStateTransition to finish the CloseRegionProcedure
|
||||
RESUME_REPORT_STATE.countDown();
|
||||
// wait until we schedule the OpenRegionProcedure
|
||||
UTIL.waitFor(10000,
|
||||
() -> proc.getCurrentStateId() == REGION_STATE_TRANSITION_CONFIRM_OPENED_VALUE);
|
||||
// the region should be in OPENING state
|
||||
assertEquals(RegionState.State.OPENING, rsn.getState());
|
||||
// resume the region server report
|
||||
RESUME_RS_REPORT.countDown();
|
||||
// wait until it finishes, it will find that the region is opened on the rs
|
||||
FINISH_RS_REPORT.await();
|
||||
// let the OpenRegionProcedure go
|
||||
procExecLock.releaseLockEntry(lockEntry);
|
||||
// wait until the TRSP is done
|
||||
future.get();
|
||||
|
||||
// confirm that the region can still be write, i.e, the regionServerReport method should not
|
||||
// change the region state to OPEN
|
||||
try (Table table = UTIL.getConnection().getTableBuilder(NAME, null).setWriteRpcTimeout(1000)
|
||||
.setOperationTimeout(2000).build()) {
|
||||
table.put(
|
||||
new Put(Bytes.toBytes("key")).addColumn(CF, Bytes.toBytes("cq"), Bytes.toBytes("val")));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
|
|||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.Assert;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
|
@ -37,15 +36,17 @@ import org.junit.rules.TestName;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@Category({MasterTests.class, MediumTests.class})
|
||||
@Category({ MasterTests.class, MediumTests.class })
|
||||
public class TestEnableTableProcedure extends TestTableDDLProcedureBase {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestEnableTableProcedure.class);
|
||||
HBaseClassTestRule.forClass(TestEnableTableProcedure.class);
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestEnableTableProcedure.class);
|
||||
@Rule public TestName name = new TestName();
|
||||
|
||||
@Rule
|
||||
public TestName name = new TestName();
|
||||
|
||||
@Test
|
||||
public void testEnableTable() throws Exception {
|
||||
|
@ -56,15 +57,15 @@ public class TestEnableTableProcedure extends TestTableDDLProcedureBase {
|
|||
UTIL.getAdmin().disableTable(tableName);
|
||||
|
||||
// Enable the table
|
||||
long procId = procExec.submitProcedure(
|
||||
new EnableTableProcedure(procExec.getEnvironment(), tableName, false));
|
||||
long procId =
|
||||
procExec.submitProcedure(new EnableTableProcedure(procExec.getEnvironment(), tableName));
|
||||
// Wait the completion
|
||||
ProcedureTestingUtility.waitProcedure(procExec, procId);
|
||||
ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
|
||||
MasterProcedureTestingUtility.validateTableIsEnabled(getMaster(), tableName);
|
||||
}
|
||||
|
||||
@Test(expected=TableNotDisabledException.class)
|
||||
@Test(expected = TableNotDisabledException.class)
|
||||
public void testEnableNonDisabledTable() throws Exception {
|
||||
final TableName tableName = TableName.valueOf(name.getMethodName());
|
||||
final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
|
||||
|
@ -72,8 +73,8 @@ public class TestEnableTableProcedure extends TestTableDDLProcedureBase {
|
|||
MasterProcedureTestingUtility.createTable(procExec, tableName, null, "f1", "f2");
|
||||
|
||||
// Enable the table - expect failure
|
||||
long procId1 = procExec.submitProcedure(
|
||||
new EnableTableProcedure(procExec.getEnvironment(), tableName, false));
|
||||
long procId1 =
|
||||
procExec.submitProcedure(new EnableTableProcedure(procExec.getEnvironment(), tableName));
|
||||
ProcedureTestingUtility.waitProcedure(procExec, procId1);
|
||||
|
||||
Procedure<?> result = procExec.getResult(procId1);
|
||||
|
@ -82,19 +83,11 @@ public class TestEnableTableProcedure extends TestTableDDLProcedureBase {
|
|||
assertTrue(
|
||||
ProcedureTestingUtility.getExceptionCause(result) instanceof TableNotDisabledException);
|
||||
|
||||
// Enable the table with skipping table state check flag (simulate recovery scenario)
|
||||
long procId2 = procExec.submitProcedure(
|
||||
new EnableTableProcedure(procExec.getEnvironment(), tableName, true));
|
||||
// Wait the completion
|
||||
ProcedureTestingUtility.waitProcedure(procExec, procId2);
|
||||
ProcedureTestingUtility.assertProcNotFailed(procExec, procId2);
|
||||
|
||||
// Enable the table - expect failure from ProcedurePrepareLatch
|
||||
final ProcedurePrepareLatch prepareLatch = new ProcedurePrepareLatch.CompatibilityLatch();
|
||||
procExec.submitProcedure(
|
||||
new EnableTableProcedure(procExec.getEnvironment(), tableName, false, prepareLatch));
|
||||
new EnableTableProcedure(procExec.getEnvironment(), tableName, prepareLatch));
|
||||
prepareLatch.await();
|
||||
Assert.fail("Enable should throw exception through latch.");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -102,9 +95,8 @@ public class TestEnableTableProcedure extends TestTableDDLProcedureBase {
|
|||
final TableName tableName = TableName.valueOf(name.getMethodName());
|
||||
final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
|
||||
|
||||
final byte[][] splitKeys = new byte[][] {
|
||||
Bytes.toBytes("a"), Bytes.toBytes("b"), Bytes.toBytes("c")
|
||||
};
|
||||
final byte[][] splitKeys =
|
||||
new byte[][] { Bytes.toBytes("a"), Bytes.toBytes("b"), Bytes.toBytes("c") };
|
||||
MasterProcedureTestingUtility.createTable(procExec, tableName, splitKeys, "f1", "f2");
|
||||
UTIL.getAdmin().disableTable(tableName);
|
||||
ProcedureTestingUtility.waitNoProcedureRunning(procExec);
|
||||
|
@ -112,8 +104,8 @@ public class TestEnableTableProcedure extends TestTableDDLProcedureBase {
|
|||
ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true);
|
||||
|
||||
// Start the Enable procedure && kill the executor
|
||||
long procId = procExec.submitProcedure(
|
||||
new EnableTableProcedure(procExec.getEnvironment(), tableName, false));
|
||||
long procId =
|
||||
procExec.submitProcedure(new EnableTableProcedure(procExec.getEnvironment(), tableName));
|
||||
|
||||
// Restart the executor and execute the step twice
|
||||
MasterProcedureTestingUtility.testRecoveryAndDoubleExecution(procExec, procId);
|
||||
|
@ -126,17 +118,16 @@ public class TestEnableTableProcedure extends TestTableDDLProcedureBase {
|
|||
final TableName tableName = TableName.valueOf(name.getMethodName());
|
||||
final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
|
||||
|
||||
final byte[][] splitKeys = new byte[][] {
|
||||
Bytes.toBytes("a"), Bytes.toBytes("b"), Bytes.toBytes("c")
|
||||
};
|
||||
final byte[][] splitKeys =
|
||||
new byte[][] { Bytes.toBytes("a"), Bytes.toBytes("b"), Bytes.toBytes("c") };
|
||||
MasterProcedureTestingUtility.createTable(procExec, tableName, splitKeys, "f1", "f2");
|
||||
UTIL.getAdmin().disableTable(tableName);
|
||||
ProcedureTestingUtility.waitNoProcedureRunning(procExec);
|
||||
ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true);
|
||||
|
||||
// Start the Enable procedure && kill the executor
|
||||
long procId = procExec.submitProcedure(
|
||||
new EnableTableProcedure(procExec.getEnvironment(), tableName, false));
|
||||
long procId =
|
||||
procExec.submitProcedure(new EnableTableProcedure(procExec.getEnvironment(), tableName));
|
||||
|
||||
int lastStep = 3; // fail before ENABLE_TABLE_SET_ENABLING_TABLE_STATE
|
||||
MasterProcedureTestingUtility.testRollbackAndDoubleExecution(procExec, procId, lastStep);
|
||||
|
|
|
@ -289,7 +289,7 @@ public class TestMasterFailoverWithProcedures {
|
|||
|
||||
// Start the Delete procedure && kill the executor
|
||||
long procId = procExec.submitProcedure(
|
||||
new EnableTableProcedure(procExec.getEnvironment(), tableName, false));
|
||||
new EnableTableProcedure(procExec.getEnvironment(), tableName));
|
||||
testRecoveryAndDoubleExecution(UTIL, procId, step);
|
||||
|
||||
MasterProcedureTestingUtility.validateTableIsEnabled(
|
||||
|
|
Loading…
Reference in New Issue