HBASE-21465 Retry on reportRegionStateTransition can lead to unexpected errors
This commit is contained in:
parent
d486bba630
commit
640a5e390b
|
@ -17,6 +17,9 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.master.assignment;
|
package org.apache.hadoop.hbase.master.assignment;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_CLOSED;
|
||||||
|
import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_OPENED;
|
||||||
|
|
||||||
import edu.umd.cs.findbugs.annotations.Nullable;
|
import edu.umd.cs.findbugs.annotations.Nullable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import org.apache.hadoop.hbase.HBaseIOException;
|
import org.apache.hadoop.hbase.HBaseIOException;
|
||||||
|
@ -40,6 +43,7 @@ import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionStateTransitionState;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionStateTransitionState;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionStateTransitionStateData;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionStateTransitionStateData;
|
||||||
|
@ -346,71 +350,139 @@ public class TransitRegionStateProcedure
|
||||||
return false; // 'false' means that this procedure handled the timeout
|
return false; // 'false' means that this procedure handled the timeout
|
||||||
}
|
}
|
||||||
|
|
||||||
private void reportTransitionOpened(MasterProcedureEnv env, RegionStateNode regionNode,
|
private boolean isOpening(RegionStateNode regionNode, ServerName serverName,
|
||||||
ServerName serverName, TransitionCode code, long openSeqNum) throws IOException {
|
TransitionCode code) {
|
||||||
switch (code) {
|
if (!regionNode.isInState(State.OPENING)) {
|
||||||
case OPENED:
|
LOG.warn("Received report {} transition from {} for {}, pid={}, but the region is not in" +
|
||||||
if (openSeqNum < 0) {
|
" OPENING state, should be a retry, ignore", code, serverName, regionNode, getProcId());
|
||||||
throw new UnexpectedStateException("Received report unexpected " + code +
|
return false;
|
||||||
" transition openSeqNum=" + openSeqNum + ", " + regionNode);
|
|
||||||
}
|
|
||||||
if (openSeqNum <= regionNode.getOpenSeqNum()) {
|
|
||||||
if (openSeqNum != 0) {
|
|
||||||
LOG.warn("Skip update of openSeqNum for {} with {} because the currentSeqNum={}",
|
|
||||||
regionNode, openSeqNum, regionNode.getOpenSeqNum());
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
regionNode.setOpenSeqNum(openSeqNum);
|
|
||||||
}
|
|
||||||
env.getAssignmentManager().regionOpened(regionNode);
|
|
||||||
if (lastState == RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_OPENED) {
|
|
||||||
// we are done
|
|
||||||
regionNode.unsetProcedure(this);
|
|
||||||
}
|
|
||||||
regionNode.getProcedureEvent().wake(env.getProcedureScheduler());
|
|
||||||
break;
|
|
||||||
case FAILED_OPEN:
|
|
||||||
// just wake up the procedure and see if we can retry
|
|
||||||
regionNode.getProcedureEvent().wake(env.getProcedureScheduler());
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
throw new UnexpectedStateException(
|
|
||||||
"Received report unexpected " + code + " transition openSeqNum=" + openSeqNum + ", " +
|
|
||||||
regionNode.toShortString() + ", " + this + ", expected OPENED or FAILED_OPEN.");
|
|
||||||
}
|
}
|
||||||
|
if (getCurrentState() != REGION_STATE_TRANSITION_CONFIRM_OPENED) {
|
||||||
|
LOG.warn(
|
||||||
|
"Received report {} transition from {} for {}, pid={}," +
|
||||||
|
" but the TRSP is not in {} state, should be a retry, ignore",
|
||||||
|
code, serverName, regionNode, getProcId(), REGION_STATE_TRANSITION_CONFIRM_OPENED);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void reportTransitionOpen(MasterProcedureEnv env, RegionStateNode regionNode,
|
||||||
|
ServerName serverName, long openSeqNum) throws IOException {
|
||||||
|
if (!isOpening(regionNode, serverName, TransitionCode.OPENED)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (openSeqNum < 0) {
|
||||||
|
throw new UnexpectedStateException("Received report unexpected " + TransitionCode.OPENED +
|
||||||
|
" transition openSeqNum=" + openSeqNum + ", " + regionNode + ", proc=" + this);
|
||||||
|
}
|
||||||
|
if (openSeqNum < regionNode.getOpenSeqNum()) {
|
||||||
|
// use the openSeqNum as a fence, if this is not a retry, then the openSeqNum should be
|
||||||
|
// greater than or equal to the existing one.
|
||||||
|
LOG.warn(
|
||||||
|
"Received report {} transition from {} for {}, pid={} but the new openSeqNum {}" +
|
||||||
|
" is less than the current one {}, should be a retry, ignore",
|
||||||
|
TransitionCode.OPENED, serverName, regionNode, getProcId(), openSeqNum,
|
||||||
|
regionNode.getOpenSeqNum());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// notice that it is possible for a region to still have the same openSeqNum if it crashes and
|
||||||
|
// we haven't written anything into it. That's why we can not just change the above condition
|
||||||
|
// from '<' to '<='. So here we still need to check whether the serverName
|
||||||
|
// matches, to determine whether this is a retry when the openSeqNum is not changed.
|
||||||
|
if (!regionNode.getRegionLocation().equals(serverName)) {
|
||||||
|
LOG.warn("Received report {} transition from {} for {}, pid={} but the region is not on it," +
|
||||||
|
" should be a retry, ignore", TransitionCode.OPENED, serverName, regionNode, getProcId());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
regionNode.setOpenSeqNum(openSeqNum);
|
||||||
|
env.getAssignmentManager().regionOpened(regionNode);
|
||||||
|
if (lastState == RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_OPENED) {
|
||||||
|
// we are done
|
||||||
|
regionNode.unsetProcedure(this);
|
||||||
|
}
|
||||||
|
regionNode.getProcedureEvent().wake(env.getProcedureScheduler());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void reportTransitionFailedOpen(MasterProcedureEnv env, RegionStateNode regionNode,
|
||||||
|
ServerName serverName) {
|
||||||
|
if (!isOpening(regionNode, serverName, TransitionCode.FAILED_OPEN)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// there is no openSeqNum for FAILED_OPEN, so we will check the target server instead
|
||||||
|
if (!regionNode.getRegionLocation().equals(serverName)) {
|
||||||
|
LOG.warn(
|
||||||
|
"Received report {} transition from {} for {}, pid={}," +
|
||||||
|
" but the region is not on it, should be a retry, ignore",
|
||||||
|
TransitionCode.FAILED_OPEN, regionNode, serverName, getProcId());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// just wake up the procedure and see if we can retry
|
||||||
|
// Notice that, even if we arrive here, this call could still be a retry, as we may retry
|
||||||
|
// opening on the same server again. And the assumption here is that, once the region state is
|
||||||
|
// OPENING, and the TRSP state is REGION_STATE_TRANSITION_CONFIRM_OPENED, the TRSP must have
|
||||||
|
// been suspended on the procedure event, so after the waking operation here, the TRSP will be
|
||||||
|
// executed and try to schedule new OpenRegionProcedure again. Once there is a successful open
|
||||||
|
// then we are done, so the TRSP will not be stuck.
|
||||||
|
// TODO: maybe we could send the procedure id of the OpenRegionProcedure to the region server
|
||||||
|
// and let the region server send it back when done, so it will be easy to detect whether this
|
||||||
|
// is a retry.
|
||||||
|
regionNode.getProcedureEvent().wake(env.getProcedureScheduler());
|
||||||
}
|
}
|
||||||
|
|
||||||
// we do not need seqId for closing a region
|
// we do not need seqId for closing a region
|
||||||
private void reportTransitionClosed(MasterProcedureEnv env, RegionStateNode regionNode,
|
private void reportTransitionClosed(MasterProcedureEnv env, RegionStateNode regionNode,
|
||||||
ServerName serverName, TransitionCode code) throws IOException {
|
ServerName serverName) throws IOException {
|
||||||
switch (code) {
|
if (!regionNode.isInState(State.CLOSING)) {
|
||||||
case CLOSED:
|
LOG.warn(
|
||||||
env.getAssignmentManager().regionClosed(regionNode, true);
|
"Received report {} transition from {} for {}, pid={}" +
|
||||||
if (lastState == RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_CLOSED) {
|
", but the region is not in CLOSING state, should be a retry, ignore",
|
||||||
// we are done
|
TransitionCode.CLOSED, serverName, regionNode, getProcId());
|
||||||
regionNode.unsetProcedure(this);
|
return;
|
||||||
}
|
|
||||||
regionNode.getProcedureEvent().wake(env.getProcedureScheduler());
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
throw new UnexpectedStateException("Received report unexpected " + code + " transition, " +
|
|
||||||
regionNode.toShortString() + ", " + this + ", expected CLOSED.");
|
|
||||||
}
|
}
|
||||||
|
if (getCurrentState() != REGION_STATE_TRANSITION_CONFIRM_CLOSED) {
|
||||||
|
LOG.warn(
|
||||||
|
"Received report {} transition from {} for {}, pid={} but the proc is not in {}" +
|
||||||
|
" state, should be a retry, ignore",
|
||||||
|
TransitionCode.CLOSED, serverName, regionNode, getProcId(),
|
||||||
|
REGION_STATE_TRANSITION_CONFIRM_CLOSED);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (!regionNode.getRegionLocation().equals(serverName)) {
|
||||||
|
LOG.warn(
|
||||||
|
"Received report {} transition from {} for {}, pid={}," +
|
||||||
|
" but the region is not on it, should be a retry, ignore",
|
||||||
|
TransitionCode.CLOSED, serverName, regionNode, getProcId());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
env.getAssignmentManager().regionClosed(regionNode, true);
|
||||||
|
if (lastState == RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_CLOSED) {
|
||||||
|
// we are done
|
||||||
|
regionNode.unsetProcedure(this);
|
||||||
|
}
|
||||||
|
regionNode.getProcedureEvent().wake(env.getProcedureScheduler());
|
||||||
}
|
}
|
||||||
|
|
||||||
// Should be called with RegionStateNode locked
|
// Should be called with RegionStateNode locked
|
||||||
public void reportTransition(MasterProcedureEnv env, RegionStateNode regionNode,
|
public void reportTransition(MasterProcedureEnv env, RegionStateNode regionNode,
|
||||||
ServerName serverName, TransitionCode code, long seqId) throws IOException {
|
ServerName serverName, TransitionCode code, long seqId) throws IOException {
|
||||||
switch (getCurrentState()) {
|
// It is possible that the previous reportRegionStateTransition call was succeeded at master
|
||||||
case REGION_STATE_TRANSITION_CONFIRM_OPENED:
|
// side, but before returning the result to region server, the rpc connection was broken, or the
|
||||||
reportTransitionOpened(env, regionNode, serverName, code, seqId);
|
// master restarted. The region server will try calling reportRegionStateTransition again under
|
||||||
|
// this scenario, so here we need to check whether this is a retry.
|
||||||
|
switch (code) {
|
||||||
|
case OPENED:
|
||||||
|
reportTransitionOpen(env, regionNode, serverName, seqId);
|
||||||
break;
|
break;
|
||||||
case REGION_STATE_TRANSITION_CONFIRM_CLOSED:
|
case FAILED_OPEN:
|
||||||
reportTransitionClosed(env, regionNode, serverName, code);
|
reportTransitionFailedOpen(env, regionNode, serverName);
|
||||||
|
break;
|
||||||
|
case CLOSED:
|
||||||
|
reportTransitionClosed(env, regionNode, serverName);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
LOG.warn("{} received unexpected report transition call from {}, code={}, seqId={}", this,
|
throw new UnexpectedStateException("Received report unexpected " + code + " transition, " +
|
||||||
serverName, code, seqId);
|
regionNode.toShortString() + ", " + this + ", expected OPENED or FAILED_OPEN or CLOSED.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -478,9 +550,8 @@ public class TransitRegionStateProcedure
|
||||||
@Override
|
@Override
|
||||||
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
||||||
super.serializeStateData(serializer);
|
super.serializeStateData(serializer);
|
||||||
RegionStateTransitionStateData.Builder builder =
|
RegionStateTransitionStateData.Builder builder = RegionStateTransitionStateData.newBuilder()
|
||||||
RegionStateTransitionStateData.newBuilder().setInitialState(initialState)
|
.setInitialState(initialState).setLastState(lastState).setForceNewPlan(forceNewPlan);
|
||||||
.setLastState(lastState).setForceNewPlan(forceNewPlan);
|
|
||||||
if (assignCandidate != null) {
|
if (assignCandidate != null) {
|
||||||
builder.setAssignCandidate(ProtobufUtil.toServerName(assignCandidate));
|
builder.setAssignCandidate(ProtobufUtil.toServerName(assignCandidate));
|
||||||
}
|
}
|
||||||
|
|
|
@ -261,12 +261,12 @@ public abstract class TestAssignmentManagerBase {
|
||||||
|
|
||||||
protected void sendTransitionReport(final ServerName serverName,
|
protected void sendTransitionReport(final ServerName serverName,
|
||||||
final org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo regionInfo,
|
final org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo regionInfo,
|
||||||
final TransitionCode state) throws IOException {
|
final TransitionCode state, long seqId) throws IOException {
|
||||||
ReportRegionStateTransitionRequest.Builder req =
|
ReportRegionStateTransitionRequest.Builder req =
|
||||||
ReportRegionStateTransitionRequest.newBuilder();
|
ReportRegionStateTransitionRequest.newBuilder();
|
||||||
req.setServer(ProtobufUtil.toServerName(serverName));
|
req.setServer(ProtobufUtil.toServerName(serverName));
|
||||||
req.addTransition(RegionStateTransition.newBuilder().addRegionInfo(regionInfo)
|
req.addTransition(RegionStateTransition.newBuilder().addRegionInfo(regionInfo)
|
||||||
.setTransitionCode(state).setOpenSeqNum(1).build());
|
.setTransitionCode(state).setOpenSeqNum(seqId).build());
|
||||||
am.reportRegionStateTransition(req.build());
|
am.reportRegionStateTransition(req.build());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -286,7 +286,11 @@ public abstract class TestAssignmentManagerBase {
|
||||||
@Override
|
@Override
|
||||||
protected RegionOpeningState execOpenRegion(ServerName server, RegionOpenInfo openReq)
|
protected RegionOpeningState execOpenRegion(ServerName server, RegionOpenInfo openReq)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED);
|
RegionInfo hri = ProtobufUtil.toRegionInfo(openReq.getRegion());
|
||||||
|
long previousOpenSeqNum =
|
||||||
|
am.getRegionStates().getOrCreateRegionStateNode(hri).getOpenSeqNum();
|
||||||
|
sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED,
|
||||||
|
previousOpenSeqNum + 2);
|
||||||
// Concurrency?
|
// Concurrency?
|
||||||
// Now update the state of our cluster in regionsToRegionServers.
|
// Now update the state of our cluster in regionsToRegionServers.
|
||||||
SortedSet<byte[]> regions = regionsToRegionServers.get(server);
|
SortedSet<byte[]> regions = regionsToRegionServers.get(server);
|
||||||
|
@ -294,7 +298,6 @@ public abstract class TestAssignmentManagerBase {
|
||||||
regions = new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR);
|
regions = new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR);
|
||||||
regionsToRegionServers.put(server, regions);
|
regionsToRegionServers.put(server, regions);
|
||||||
}
|
}
|
||||||
RegionInfo hri = ProtobufUtil.toRegionInfo(openReq.getRegion());
|
|
||||||
if (regions.contains(hri.getRegionName())) {
|
if (regions.contains(hri.getRegionName())) {
|
||||||
throw new UnsupportedOperationException(hri.getRegionNameAsString());
|
throw new UnsupportedOperationException(hri.getRegionNameAsString());
|
||||||
}
|
}
|
||||||
|
@ -306,7 +309,7 @@ public abstract class TestAssignmentManagerBase {
|
||||||
protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName)
|
protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
RegionInfo hri = am.getRegionInfo(regionName);
|
RegionInfo hri = am.getRegionInfo(regionName);
|
||||||
sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri), TransitionCode.CLOSED);
|
sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri), TransitionCode.CLOSED, -1);
|
||||||
return CloseRegionResponse.newBuilder().setClosed(true).build();
|
return CloseRegionResponse.newBuilder().setClosed(true).build();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -497,18 +500,18 @@ public abstract class TestAssignmentManagerBase {
|
||||||
@Override
|
@Override
|
||||||
protected RegionOpeningState execOpenRegion(final ServerName server, RegionOpenInfo openReq)
|
protected RegionOpeningState execOpenRegion(final ServerName server, RegionOpenInfo openReq)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
switch (rand.nextInt(6)) {
|
RegionInfo hri = ProtobufUtil.toRegionInfo(openReq.getRegion());
|
||||||
|
long previousOpenSeqNum =
|
||||||
|
am.getRegionStates().getOrCreateRegionStateNode(hri).getOpenSeqNum();
|
||||||
|
switch (rand.nextInt(3)) {
|
||||||
case 0:
|
case 0:
|
||||||
LOG.info("Return OPENED response");
|
LOG.info("Return OPENED response");
|
||||||
sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED);
|
sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED,
|
||||||
|
previousOpenSeqNum + 2);
|
||||||
return OpenRegionResponse.RegionOpeningState.OPENED;
|
return OpenRegionResponse.RegionOpeningState.OPENED;
|
||||||
case 1:
|
case 1:
|
||||||
LOG.info("Return transition report that OPENED/ALREADY_OPENED response");
|
|
||||||
sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED);
|
|
||||||
return OpenRegionResponse.RegionOpeningState.ALREADY_OPENED;
|
|
||||||
case 2:
|
|
||||||
LOG.info("Return transition report that FAILED_OPEN/FAILED_OPENING response");
|
LOG.info("Return transition report that FAILED_OPEN/FAILED_OPENING response");
|
||||||
sendTransitionReport(server, openReq.getRegion(), TransitionCode.FAILED_OPEN);
|
sendTransitionReport(server, openReq.getRegion(), TransitionCode.FAILED_OPEN, -1);
|
||||||
return OpenRegionResponse.RegionOpeningState.FAILED_OPENING;
|
return OpenRegionResponse.RegionOpeningState.FAILED_OPENING;
|
||||||
default:
|
default:
|
||||||
// fall out
|
// fall out
|
||||||
|
@ -534,7 +537,7 @@ public abstract class TestAssignmentManagerBase {
|
||||||
boolean closed = rand.nextBoolean();
|
boolean closed = rand.nextBoolean();
|
||||||
if (closed) {
|
if (closed) {
|
||||||
RegionInfo hri = am.getRegionInfo(regionName);
|
RegionInfo hri = am.getRegionInfo(regionName);
|
||||||
sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri), TransitionCode.CLOSED);
|
sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri), TransitionCode.CLOSED, -1);
|
||||||
}
|
}
|
||||||
resp.setClosed(closed);
|
resp.setClosed(closed);
|
||||||
return resp.build();
|
return resp.build();
|
||||||
|
|
|
@ -0,0 +1,146 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.master.assignment;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_OPENED_VALUE;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.PleaseHoldException;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
import org.apache.hadoop.hbase.master.HMaster;
|
||||||
|
import org.apache.hadoop.hbase.master.MasterServices;
|
||||||
|
import org.apache.hadoop.hbase.master.RegionPlan;
|
||||||
|
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
|
||||||
|
|
||||||
|
@Category({ MasterTests.class, MediumTests.class })
|
||||||
|
public class TestReportRegionStateTransitionRetry {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestReportRegionStateTransitionRetry.class);
|
||||||
|
|
||||||
|
private static final AtomicReference<CountDownLatch> RESUME_AND_FAIL = new AtomicReference<>();
|
||||||
|
|
||||||
|
private static final class AssignmentManagerForTest extends AssignmentManager {
|
||||||
|
|
||||||
|
public AssignmentManagerForTest(MasterServices master) {
|
||||||
|
super(master);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ReportRegionStateTransitionResponse reportRegionStateTransition(
|
||||||
|
ReportRegionStateTransitionRequest req) throws PleaseHoldException {
|
||||||
|
ReportRegionStateTransitionResponse resp = super.reportRegionStateTransition(req);
|
||||||
|
CountDownLatch latch = RESUME_AND_FAIL.getAndSet(null);
|
||||||
|
if (latch != null) {
|
||||||
|
try {
|
||||||
|
latch.await();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
throw new PleaseHoldException("Inject error");
|
||||||
|
}
|
||||||
|
return resp;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static final class HMasterForTest extends HMaster {
|
||||||
|
|
||||||
|
public HMasterForTest(Configuration conf) throws IOException, KeeperException {
|
||||||
|
super(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected AssignmentManager createAssignmentManager(MasterServices master) {
|
||||||
|
return new AssignmentManagerForTest(master);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||||
|
|
||||||
|
private static TableName NAME = TableName.valueOf("Retry");
|
||||||
|
|
||||||
|
private static byte[] CF = Bytes.toBytes("cf");
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUp() throws Exception {
|
||||||
|
UTIL.getConfiguration().setClass(HConstants.MASTER_IMPL, HMasterForTest.class, HMaster.class);
|
||||||
|
UTIL.startMiniCluster(1);
|
||||||
|
UTIL.createTable(NAME, CF);
|
||||||
|
UTIL.waitTableAvailable(NAME);
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDown() throws Exception {
|
||||||
|
UTIL.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRetryOnClose() throws Exception {
|
||||||
|
RegionInfo region = UTIL.getMiniHBaseCluster().getRegions(NAME).get(0).getRegionInfo();
|
||||||
|
ProcedureExecutor<MasterProcedureEnv> procExec =
|
||||||
|
UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
|
||||||
|
AssignmentManager am = UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager();
|
||||||
|
RegionStateNode rsn = am.getRegionStates().getRegionStateNode(region);
|
||||||
|
|
||||||
|
CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
RESUME_AND_FAIL.set(latch);
|
||||||
|
Future<byte[]> future =
|
||||||
|
am.moveAsync(new RegionPlan(region, rsn.getRegionLocation(), rsn.getRegionLocation()));
|
||||||
|
TransitRegionStateProcedure proc =
|
||||||
|
procExec.getProcedures().stream().filter(p -> p instanceof TransitRegionStateProcedure)
|
||||||
|
.filter(p -> !p.isFinished()).map(p -> (TransitRegionStateProcedure) p).findAny().get();
|
||||||
|
|
||||||
|
// wait until we schedule the OpenRegionProcedure
|
||||||
|
UTIL.waitFor(10000,
|
||||||
|
() -> proc.getCurrentStateId() == REGION_STATE_TRANSITION_CONFIRM_OPENED_VALUE);
|
||||||
|
// Fail the reportRegionStateTransition for closing
|
||||||
|
latch.countDown();
|
||||||
|
future.get();
|
||||||
|
|
||||||
|
// confirm that the region can still be write
|
||||||
|
try (Table table = UTIL.getConnection().getTableBuilder(NAME, null).setWriteRpcTimeout(1000)
|
||||||
|
.setOperationTimeout(2000).build()) {
|
||||||
|
table.put(
|
||||||
|
new Put(Bytes.toBytes("key")).addColumn(CF, Bytes.toBytes("cq"), Bytes.toBytes("val")));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue