HBASE-21465 Retry on reportRegionStateTransition can lead to unexpected errors
This commit is contained in:
parent
66c2a7487c
commit
fbf5e9e0c4
|
@ -17,6 +17,9 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.master.assignment;
|
||||
|
||||
import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_CLOSED;
|
||||
import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_OPENED;
|
||||
|
||||
import edu.umd.cs.findbugs.annotations.Nullable;
|
||||
import java.io.IOException;
|
||||
import org.apache.hadoop.hbase.HBaseIOException;
|
||||
|
@ -40,6 +43,7 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionStateTransitionState;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionStateTransitionStateData;
|
||||
|
@ -346,71 +350,139 @@ public class TransitRegionStateProcedure
|
|||
return false; // 'false' means that this procedure handled the timeout
|
||||
}
|
||||
|
||||
private void reportTransitionOpened(MasterProcedureEnv env, RegionStateNode regionNode,
|
||||
ServerName serverName, TransitionCode code, long openSeqNum) throws IOException {
|
||||
switch (code) {
|
||||
case OPENED:
|
||||
if (openSeqNum < 0) {
|
||||
throw new UnexpectedStateException("Received report unexpected " + code +
|
||||
" transition openSeqNum=" + openSeqNum + ", " + regionNode);
|
||||
}
|
||||
if (openSeqNum <= regionNode.getOpenSeqNum()) {
|
||||
if (openSeqNum != 0) {
|
||||
LOG.warn("Skip update of openSeqNum for {} with {} because the currentSeqNum={}",
|
||||
regionNode, openSeqNum, regionNode.getOpenSeqNum());
|
||||
}
|
||||
} else {
|
||||
regionNode.setOpenSeqNum(openSeqNum);
|
||||
}
|
||||
env.getAssignmentManager().regionOpened(regionNode);
|
||||
if (lastState == RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_OPENED) {
|
||||
// we are done
|
||||
regionNode.unsetProcedure(this);
|
||||
}
|
||||
regionNode.getProcedureEvent().wake(env.getProcedureScheduler());
|
||||
break;
|
||||
case FAILED_OPEN:
|
||||
// just wake up the procedure and see if we can retry
|
||||
regionNode.getProcedureEvent().wake(env.getProcedureScheduler());
|
||||
break;
|
||||
default:
|
||||
throw new UnexpectedStateException(
|
||||
"Received report unexpected " + code + " transition openSeqNum=" + openSeqNum + ", " +
|
||||
regionNode.toShortString() + ", " + this + ", expected OPENED or FAILED_OPEN.");
|
||||
private boolean isOpening(RegionStateNode regionNode, ServerName serverName,
|
||||
TransitionCode code) {
|
||||
if (!regionNode.isInState(State.OPENING)) {
|
||||
LOG.warn("Received report {} transition from {} for {}, pid={}, but the region is not in" +
|
||||
" OPENING state, should be a retry, ignore", code, serverName, regionNode, getProcId());
|
||||
return false;
|
||||
}
|
||||
if (getCurrentState() != REGION_STATE_TRANSITION_CONFIRM_OPENED) {
|
||||
LOG.warn(
|
||||
"Received report {} transition from {} for {}, pid={}," +
|
||||
" but the TRSP is not in {} state, should be a retry, ignore",
|
||||
code, serverName, regionNode, getProcId(), REGION_STATE_TRANSITION_CONFIRM_OPENED);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private void reportTransitionOpen(MasterProcedureEnv env, RegionStateNode regionNode,
|
||||
ServerName serverName, long openSeqNum) throws IOException {
|
||||
if (!isOpening(regionNode, serverName, TransitionCode.OPENED)) {
|
||||
return;
|
||||
}
|
||||
if (openSeqNum < 0) {
|
||||
throw new UnexpectedStateException("Received report unexpected " + TransitionCode.OPENED +
|
||||
" transition openSeqNum=" + openSeqNum + ", " + regionNode + ", proc=" + this);
|
||||
}
|
||||
if (openSeqNum < regionNode.getOpenSeqNum()) {
|
||||
// use the openSeqNum as a fence, if this is not a retry, then the openSeqNum should be
|
||||
// greater than or equal to the existing one.
|
||||
LOG.warn(
|
||||
"Received report {} transition from {} for {}, pid={} but the new openSeqNum {}" +
|
||||
" is less than the current one {}, should be a retry, ignore",
|
||||
TransitionCode.OPENED, serverName, regionNode, getProcId(), openSeqNum,
|
||||
regionNode.getOpenSeqNum());
|
||||
return;
|
||||
}
|
||||
// notice that it is possible for a region to still have the same openSeqNum if it crashes and
|
||||
// we haven't written anything into it. That's why we can not just change the above condition
|
||||
// from '<' to '<='. So here we still need to check whether the serverName
|
||||
// matches, to determine whether this is a retry when the openSeqNum is not changed.
|
||||
if (!regionNode.getRegionLocation().equals(serverName)) {
|
||||
LOG.warn("Received report {} transition from {} for {}, pid={} but the region is not on it," +
|
||||
" should be a retry, ignore", TransitionCode.OPENED, serverName, regionNode, getProcId());
|
||||
return;
|
||||
}
|
||||
regionNode.setOpenSeqNum(openSeqNum);
|
||||
env.getAssignmentManager().regionOpened(regionNode);
|
||||
if (lastState == RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_OPENED) {
|
||||
// we are done
|
||||
regionNode.unsetProcedure(this);
|
||||
}
|
||||
regionNode.getProcedureEvent().wake(env.getProcedureScheduler());
|
||||
}
|
||||
|
||||
private void reportTransitionFailedOpen(MasterProcedureEnv env, RegionStateNode regionNode,
|
||||
ServerName serverName) {
|
||||
if (!isOpening(regionNode, serverName, TransitionCode.FAILED_OPEN)) {
|
||||
return;
|
||||
}
|
||||
// there is no openSeqNum for FAILED_OPEN, so we will check the target server instead
|
||||
if (!regionNode.getRegionLocation().equals(serverName)) {
|
||||
LOG.warn(
|
||||
"Received report {} transition from {} for {}, pid={}," +
|
||||
" but the region is not on it, should be a retry, ignore",
|
||||
TransitionCode.FAILED_OPEN, regionNode, serverName, getProcId());
|
||||
return;
|
||||
}
|
||||
// just wake up the procedure and see if we can retry
|
||||
// Notice that, even if we arrive here, this call could still be a retry, as we may retry
|
||||
// opening on the same server again. And the assumption here is that, once the region state is
|
||||
// OPENING, and the TRSP state is REGION_STATE_TRANSITION_CONFIRM_OPENED, the TRSP must have
|
||||
// been suspended on the procedure event, so after the waking operation here, the TRSP will be
|
||||
// executed and try to schedule new OpenRegionProcedure again. Once there is a successful open
|
||||
// then we are done, so the TRSP will not be stuck.
|
||||
// TODO: maybe we could send the procedure id of the OpenRegionProcedure to the region server
|
||||
// and let the region server send it back when done, so it will be easy to detect whether this
|
||||
// is a retry.
|
||||
regionNode.getProcedureEvent().wake(env.getProcedureScheduler());
|
||||
}
|
||||
|
||||
// we do not need seqId for closing a region
|
||||
private void reportTransitionClosed(MasterProcedureEnv env, RegionStateNode regionNode,
|
||||
ServerName serverName, TransitionCode code) throws IOException {
|
||||
switch (code) {
|
||||
case CLOSED:
|
||||
env.getAssignmentManager().regionClosed(regionNode, true);
|
||||
if (lastState == RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_CLOSED) {
|
||||
// we are done
|
||||
regionNode.unsetProcedure(this);
|
||||
}
|
||||
regionNode.getProcedureEvent().wake(env.getProcedureScheduler());
|
||||
break;
|
||||
default:
|
||||
throw new UnexpectedStateException("Received report unexpected " + code + " transition, " +
|
||||
regionNode.toShortString() + ", " + this + ", expected CLOSED.");
|
||||
ServerName serverName) throws IOException {
|
||||
if (!regionNode.isInState(State.CLOSING)) {
|
||||
LOG.warn(
|
||||
"Received report {} transition from {} for {}, pid={}" +
|
||||
", but the region is not in CLOSING state, should be a retry, ignore",
|
||||
TransitionCode.CLOSED, serverName, regionNode, getProcId());
|
||||
return;
|
||||
}
|
||||
if (getCurrentState() != REGION_STATE_TRANSITION_CONFIRM_CLOSED) {
|
||||
LOG.warn(
|
||||
"Received report {} transition from {} for {}, pid={} but the proc is not in {}" +
|
||||
" state, should be a retry, ignore",
|
||||
TransitionCode.CLOSED, serverName, regionNode, getProcId(),
|
||||
REGION_STATE_TRANSITION_CONFIRM_CLOSED);
|
||||
return;
|
||||
}
|
||||
if (!regionNode.getRegionLocation().equals(serverName)) {
|
||||
LOG.warn(
|
||||
"Received report {} transition from {} for {}, pid={}," +
|
||||
" but the region is not on it, should be a retry, ignore",
|
||||
TransitionCode.CLOSED, serverName, regionNode, getProcId());
|
||||
return;
|
||||
}
|
||||
env.getAssignmentManager().regionClosed(regionNode, true);
|
||||
if (lastState == RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_CLOSED) {
|
||||
// we are done
|
||||
regionNode.unsetProcedure(this);
|
||||
}
|
||||
regionNode.getProcedureEvent().wake(env.getProcedureScheduler());
|
||||
}
|
||||
|
||||
// Should be called with RegionStateNode locked
|
||||
public void reportTransition(MasterProcedureEnv env, RegionStateNode regionNode,
|
||||
ServerName serverName, TransitionCode code, long seqId) throws IOException {
|
||||
switch (getCurrentState()) {
|
||||
case REGION_STATE_TRANSITION_CONFIRM_OPENED:
|
||||
reportTransitionOpened(env, regionNode, serverName, code, seqId);
|
||||
// It is possible that the previous reportRegionStateTransition call was succeeded at master
|
||||
// side, but before returning the result to region server, the rpc connection was broken, or the
|
||||
// master restarted. The region server will try calling reportRegionStateTransition again under
|
||||
// this scenario, so here we need to check whether this is a retry.
|
||||
switch (code) {
|
||||
case OPENED:
|
||||
reportTransitionOpen(env, regionNode, serverName, seqId);
|
||||
break;
|
||||
case REGION_STATE_TRANSITION_CONFIRM_CLOSED:
|
||||
reportTransitionClosed(env, regionNode, serverName, code);
|
||||
case FAILED_OPEN:
|
||||
reportTransitionFailedOpen(env, regionNode, serverName);
|
||||
break;
|
||||
case CLOSED:
|
||||
reportTransitionClosed(env, regionNode, serverName);
|
||||
break;
|
||||
default:
|
||||
LOG.warn("{} received unexpected report transition call from {}, code={}, seqId={}", this,
|
||||
serverName, code, seqId);
|
||||
throw new UnexpectedStateException("Received report unexpected " + code + " transition, " +
|
||||
regionNode.toShortString() + ", " + this + ", expected OPENED or FAILED_OPEN or CLOSED.");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -478,9 +550,8 @@ public class TransitRegionStateProcedure
|
|||
@Override
|
||||
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
||||
super.serializeStateData(serializer);
|
||||
RegionStateTransitionStateData.Builder builder =
|
||||
RegionStateTransitionStateData.newBuilder().setInitialState(initialState)
|
||||
.setLastState(lastState).setForceNewPlan(forceNewPlan);
|
||||
RegionStateTransitionStateData.Builder builder = RegionStateTransitionStateData.newBuilder()
|
||||
.setInitialState(initialState).setLastState(lastState).setForceNewPlan(forceNewPlan);
|
||||
if (assignCandidate != null) {
|
||||
builder.setAssignCandidate(ProtobufUtil.toServerName(assignCandidate));
|
||||
}
|
||||
|
|
|
@ -261,12 +261,12 @@ public abstract class TestAssignmentManagerBase {
|
|||
|
||||
protected void sendTransitionReport(final ServerName serverName,
|
||||
final org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo regionInfo,
|
||||
final TransitionCode state) throws IOException {
|
||||
final TransitionCode state, long seqId) throws IOException {
|
||||
ReportRegionStateTransitionRequest.Builder req =
|
||||
ReportRegionStateTransitionRequest.newBuilder();
|
||||
req.setServer(ProtobufUtil.toServerName(serverName));
|
||||
req.addTransition(RegionStateTransition.newBuilder().addRegionInfo(regionInfo)
|
||||
.setTransitionCode(state).setOpenSeqNum(1).build());
|
||||
.setTransitionCode(state).setOpenSeqNum(seqId).build());
|
||||
am.reportRegionStateTransition(req.build());
|
||||
}
|
||||
|
||||
|
@ -286,7 +286,11 @@ public abstract class TestAssignmentManagerBase {
|
|||
@Override
|
||||
protected RegionOpeningState execOpenRegion(ServerName server, RegionOpenInfo openReq)
|
||||
throws IOException {
|
||||
sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED);
|
||||
RegionInfo hri = ProtobufUtil.toRegionInfo(openReq.getRegion());
|
||||
long previousOpenSeqNum =
|
||||
am.getRegionStates().getOrCreateRegionStateNode(hri).getOpenSeqNum();
|
||||
sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED,
|
||||
previousOpenSeqNum + 2);
|
||||
// Concurrency?
|
||||
// Now update the state of our cluster in regionsToRegionServers.
|
||||
SortedSet<byte[]> regions = regionsToRegionServers.get(server);
|
||||
|
@ -294,7 +298,6 @@ public abstract class TestAssignmentManagerBase {
|
|||
regions = new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR);
|
||||
regionsToRegionServers.put(server, regions);
|
||||
}
|
||||
RegionInfo hri = ProtobufUtil.toRegionInfo(openReq.getRegion());
|
||||
if (regions.contains(hri.getRegionName())) {
|
||||
throw new UnsupportedOperationException(hri.getRegionNameAsString());
|
||||
}
|
||||
|
@ -306,7 +309,7 @@ public abstract class TestAssignmentManagerBase {
|
|||
protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName)
|
||||
throws IOException {
|
||||
RegionInfo hri = am.getRegionInfo(regionName);
|
||||
sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri), TransitionCode.CLOSED);
|
||||
sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri), TransitionCode.CLOSED, -1);
|
||||
return CloseRegionResponse.newBuilder().setClosed(true).build();
|
||||
}
|
||||
}
|
||||
|
@ -497,18 +500,18 @@ public abstract class TestAssignmentManagerBase {
|
|||
@Override
|
||||
protected RegionOpeningState execOpenRegion(final ServerName server, RegionOpenInfo openReq)
|
||||
throws IOException {
|
||||
switch (rand.nextInt(6)) {
|
||||
RegionInfo hri = ProtobufUtil.toRegionInfo(openReq.getRegion());
|
||||
long previousOpenSeqNum =
|
||||
am.getRegionStates().getOrCreateRegionStateNode(hri).getOpenSeqNum();
|
||||
switch (rand.nextInt(3)) {
|
||||
case 0:
|
||||
LOG.info("Return OPENED response");
|
||||
sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED);
|
||||
sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED,
|
||||
previousOpenSeqNum + 2);
|
||||
return OpenRegionResponse.RegionOpeningState.OPENED;
|
||||
case 1:
|
||||
LOG.info("Return transition report that OPENED/ALREADY_OPENED response");
|
||||
sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED);
|
||||
return OpenRegionResponse.RegionOpeningState.ALREADY_OPENED;
|
||||
case 2:
|
||||
LOG.info("Return transition report that FAILED_OPEN/FAILED_OPENING response");
|
||||
sendTransitionReport(server, openReq.getRegion(), TransitionCode.FAILED_OPEN);
|
||||
sendTransitionReport(server, openReq.getRegion(), TransitionCode.FAILED_OPEN, -1);
|
||||
return OpenRegionResponse.RegionOpeningState.FAILED_OPENING;
|
||||
default:
|
||||
// fall out
|
||||
|
@ -534,7 +537,7 @@ public abstract class TestAssignmentManagerBase {
|
|||
boolean closed = rand.nextBoolean();
|
||||
if (closed) {
|
||||
RegionInfo hri = am.getRegionInfo(regionName);
|
||||
sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri), TransitionCode.CLOSED);
|
||||
sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri), TransitionCode.CLOSED, -1);
|
||||
}
|
||||
resp.setClosed(closed);
|
||||
return resp.build();
|
||||
|
|
|
@ -0,0 +1,146 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.assignment;
|
||||
|
||||
import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_OPENED_VALUE;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.PleaseHoldException;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.hadoop.hbase.master.RegionPlan;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
|
||||
|
||||
@Category({ MasterTests.class, MediumTests.class })
|
||||
public class TestReportRegionStateTransitionRetry {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestReportRegionStateTransitionRetry.class);
|
||||
|
||||
private static final AtomicReference<CountDownLatch> RESUME_AND_FAIL = new AtomicReference<>();
|
||||
|
||||
private static final class AssignmentManagerForTest extends AssignmentManager {
|
||||
|
||||
public AssignmentManagerForTest(MasterServices master) {
|
||||
super(master);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReportRegionStateTransitionResponse reportRegionStateTransition(
|
||||
ReportRegionStateTransitionRequest req) throws PleaseHoldException {
|
||||
ReportRegionStateTransitionResponse resp = super.reportRegionStateTransition(req);
|
||||
CountDownLatch latch = RESUME_AND_FAIL.getAndSet(null);
|
||||
if (latch != null) {
|
||||
try {
|
||||
latch.await();
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
throw new PleaseHoldException("Inject error");
|
||||
}
|
||||
return resp;
|
||||
}
|
||||
}
|
||||
|
||||
public static final class HMasterForTest extends HMaster {
|
||||
|
||||
public HMasterForTest(Configuration conf) throws IOException, KeeperException {
|
||||
super(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AssignmentManager createAssignmentManager(MasterServices master) {
|
||||
return new AssignmentManagerForTest(master);
|
||||
}
|
||||
}
|
||||
|
||||
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
|
||||
private static TableName NAME = TableName.valueOf("Retry");
|
||||
|
||||
private static byte[] CF = Bytes.toBytes("cf");
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() throws Exception {
|
||||
UTIL.getConfiguration().setClass(HConstants.MASTER_IMPL, HMasterForTest.class, HMaster.class);
|
||||
UTIL.startMiniCluster(1);
|
||||
UTIL.createTable(NAME, CF);
|
||||
UTIL.waitTableAvailable(NAME);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws Exception {
|
||||
UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRetryOnClose() throws Exception {
|
||||
RegionInfo region = UTIL.getMiniHBaseCluster().getRegions(NAME).get(0).getRegionInfo();
|
||||
ProcedureExecutor<MasterProcedureEnv> procExec =
|
||||
UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
|
||||
AssignmentManager am = UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager();
|
||||
RegionStateNode rsn = am.getRegionStates().getRegionStateNode(region);
|
||||
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
RESUME_AND_FAIL.set(latch);
|
||||
Future<byte[]> future =
|
||||
am.moveAsync(new RegionPlan(region, rsn.getRegionLocation(), rsn.getRegionLocation()));
|
||||
TransitRegionStateProcedure proc =
|
||||
procExec.getProcedures().stream().filter(p -> p instanceof TransitRegionStateProcedure)
|
||||
.filter(p -> !p.isFinished()).map(p -> (TransitRegionStateProcedure) p).findAny().get();
|
||||
|
||||
// wait until we schedule the OpenRegionProcedure
|
||||
UTIL.waitFor(10000,
|
||||
() -> proc.getCurrentStateId() == REGION_STATE_TRANSITION_CONFIRM_OPENED_VALUE);
|
||||
// Fail the reportRegionStateTransition for closing
|
||||
latch.countDown();
|
||||
future.get();
|
||||
|
||||
// confirm that the region can still be write
|
||||
try (Table table = UTIL.getConnection().getTableBuilder(NAME, null).setWriteRpcTimeout(1000)
|
||||
.setOperationTimeout(2000).build()) {
|
||||
table.put(
|
||||
new Put(Bytes.toBytes("key")).addColumn(CF, Bytes.toBytes("cq"), Bytes.toBytes("val")));
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue