Revert "HBASE-21465 Retry on reportRegionStateTransition can lead to unexpected errors"

This reverts commit ffb003ee44.
This commit is contained in:
zhangduo 2018-11-13 22:24:59 +08:00
parent 3eb27a099a
commit 64c4861272
3 changed files with 67 additions and 277 deletions

View File

@ -17,9 +17,6 @@
*/
package org.apache.hadoop.hbase.master.assignment;
import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_CLOSED;
import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_OPENED;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.io.IOException;
import org.apache.hadoop.hbase.HBaseIOException;
@ -43,7 +40,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionStateTransitionState;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionStateTransitionStateData;
@ -350,39 +346,18 @@ public class TransitRegionStateProcedure
return false; // 'false' means that this procedure handled the timeout
}
private boolean isOpening(RegionStateNode regionNode, ServerName serverName,
TransitionCode code) {
if (!regionNode.isInState(State.OPENING)) {
LOG.warn("Received report {} transition from {}, pid={}, but the region {} is not in" +
" OPENING state, should be a retry, ignore", code, serverName, getProcId(), regionNode);
return false;
}
if (getCurrentState() != REGION_STATE_TRANSITION_CONFIRM_OPENED) {
LOG.warn(
"Received report {} transition from {}, pid={}, but the TRSP is not in {} state," +
" should be a retry, ignore",
code, serverName, getProcId(), REGION_STATE_TRANSITION_CONFIRM_OPENED);
return false;
}
return true;
}
private void reportTransitionOpen(MasterProcedureEnv env, RegionStateNode regionNode,
ServerName serverName, long openSeqNum) throws IOException {
if (!isOpening(regionNode, serverName, TransitionCode.OPENED)) {
return;
}
private void reportTransitionOpened(MasterProcedureEnv env, RegionStateNode regionNode,
ServerName serverName, TransitionCode code, long openSeqNum) throws IOException {
switch (code) {
case OPENED:
if (openSeqNum < 0) {
throw new UnexpectedStateException("Received report unexpected " + TransitionCode.OPENED +
" transition openSeqNum=" + openSeqNum + ", " + regionNode + ", proc=" + this);
throw new UnexpectedStateException("Received report unexpected " + code +
" transition openSeqNum=" + openSeqNum + ", " + regionNode);
}
if (openSeqNum <= regionNode.getOpenSeqNum()) {
// use the openSeqNum as a fence, if this is not a retry, then the openSeqNum should be
// greater than the existing one.
if (openSeqNum != 0) {
LOG.warn("Skip update of region state for {} with openSeqNum={}, pid={} because the" +
" currentSeqNum={}", regionNode, openSeqNum, getProcId(), regionNode.getOpenSeqNum());
return;
LOG.warn("Skip update of openSeqNum for {} with {} because the currentSeqNum={}",
regionNode, openSeqNum, regionNode.getOpenSeqNum());
}
} else {
regionNode.setOpenSeqNum(openSeqNum);
@ -393,86 +368,49 @@ public class TransitRegionStateProcedure
regionNode.unsetProcedure(this);
}
regionNode.getProcedureEvent().wake(env.getProcedureScheduler());
}
private void reportTransitionFailedOpen(MasterProcedureEnv env, RegionStateNode regionNode,
ServerName serverName) {
if (!isOpening(regionNode, serverName, TransitionCode.FAILED_OPEN)) {
return;
}
// there is no openSeqNum for FAILED_OPEN, so we will check the target server instead
if (!regionNode.getRegionLocation().equals(serverName)) {
LOG.warn(
"Received report {} transition from {}, pid={}, but the region {} is not on it," +
" should be a retry, ignore",
TransitionCode.FAILED_OPEN, serverName, getProcId(), regionNode);
return;
}
break;
case FAILED_OPEN:
// just wake up the procedure and see if we can retry
// Notice that, even if we arrive here, this call could still be a retry, as we may retry
// opening on the same server again. And the assumption here is that, once the region state is
// OPENING, and the TRSP state is REGION_STATE_TRANSITION_CONFIRM_OPENED, the TRSP must have
// been suspended on the procedure event, so after the waking operation here, the TRSP will be
// executed and try to schedule new OpenRegionProcedure again. Once there is a successful open
// then we are done, so the TRSP will not be stuck.
// TODO: maybe we could send the procedure id of the OpenRegionProcedure to the region server
// and let the region server send it back when done, so it will be easy to detect whether this
// is a retry.
regionNode.getProcedureEvent().wake(env.getProcedureScheduler());
break;
default:
throw new UnexpectedStateException(
"Received report unexpected " + code + " transition openSeqNum=" + openSeqNum + ", " +
regionNode.toShortString() + ", " + this + ", expected OPENED or FAILED_OPEN.");
}
}
// we do not need seqId for closing a region
private void reportTransitionClosed(MasterProcedureEnv env, RegionStateNode regionNode,
ServerName serverName) throws IOException {
if (!regionNode.isInState(State.CLOSING)) {
LOG.warn(
"Received report {} transition from {}, pid={}, but the region {} is not in" +
" CLOSING state, should be a retry, ignore",
TransitionCode.CLOSED, serverName, getProcId(), regionNode);
return;
}
if (getCurrentState() != REGION_STATE_TRANSITION_CONFIRM_CLOSED) {
LOG.warn(
"Received report {} transition from {}, pid={} but the proc is not in {}" +
" state, should be a retry, ignore",
TransitionCode.CLOSED, serverName, getProcId(), REGION_STATE_TRANSITION_CONFIRM_CLOSED);
return;
}
if (!regionNode.getRegionLocation().equals(serverName)) {
LOG.warn(
"Received report {} transition from {}, pid={}, but the region {} is not on it," +
" should be a retry, ignore",
TransitionCode.CLOSED, serverName, getProcId(), regionNode);
return;
}
ServerName serverName, TransitionCode code) throws IOException {
switch (code) {
case CLOSED:
env.getAssignmentManager().regionClosed(regionNode, true);
if (lastState == RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_CLOSED) {
// we are done
regionNode.unsetProcedure(this);
}
regionNode.getProcedureEvent().wake(env.getProcedureScheduler());
break;
default:
throw new UnexpectedStateException("Received report unexpected " + code + " transition, " +
regionNode.toShortString() + ", " + this + ", expected CLOSED.");
}
}
// Should be called with RegionStateNode locked
public void reportTransition(MasterProcedureEnv env, RegionStateNode regionNode,
ServerName serverName, TransitionCode code, long seqId) throws IOException {
// It is possible that the previous reportRegionStateTransition call was succeeded at master
// side, but before returning the result to region server, the rpc connection was broken, or the
// master restarted. The region server will try calling reportRegionStateTransition again under
// this scenario, so here we need to check whether this is a retry.
switch (code) {
case OPENED:
reportTransitionOpen(env, regionNode, serverName, seqId);
switch (getCurrentState()) {
case REGION_STATE_TRANSITION_CONFIRM_OPENED:
reportTransitionOpened(env, regionNode, serverName, code, seqId);
break;
case FAILED_OPEN:
reportTransitionFailedOpen(env, regionNode, serverName);
break;
case CLOSED:
reportTransitionClosed(env, regionNode, serverName);
case REGION_STATE_TRANSITION_CONFIRM_CLOSED:
reportTransitionClosed(env, regionNode, serverName, code);
break;
default:
throw new UnexpectedStateException("Received report unexpected " + code + " transition, " +
regionNode.toShortString() + ", " + this + ", expected OPENED or FAILED_OPEN or CLOSED.");
LOG.warn("{} received unexpected report transition call from {}, code={}, seqId={}", this,
serverName, code, seqId);
}
}
@ -540,8 +478,9 @@ public class TransitRegionStateProcedure
@Override
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
super.serializeStateData(serializer);
RegionStateTransitionStateData.Builder builder = RegionStateTransitionStateData.newBuilder()
.setInitialState(initialState).setLastState(lastState).setForceNewPlan(forceNewPlan);
RegionStateTransitionStateData.Builder builder =
RegionStateTransitionStateData.newBuilder().setInitialState(initialState)
.setLastState(lastState).setForceNewPlan(forceNewPlan);
if (assignCandidate != null) {
builder.setAssignCandidate(ProtobufUtil.toServerName(assignCandidate));
}

View File

@ -261,12 +261,12 @@ public abstract class TestAssignmentManagerBase {
protected void sendTransitionReport(final ServerName serverName,
final org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo regionInfo,
final TransitionCode state, long seqId) throws IOException {
final TransitionCode state) throws IOException {
ReportRegionStateTransitionRequest.Builder req =
ReportRegionStateTransitionRequest.newBuilder();
req.setServer(ProtobufUtil.toServerName(serverName));
req.addTransition(RegionStateTransition.newBuilder().addRegionInfo(regionInfo)
.setTransitionCode(state).setOpenSeqNum(seqId).build());
.setTransitionCode(state).setOpenSeqNum(1).build());
am.reportRegionStateTransition(req.build());
}
@ -286,11 +286,7 @@ public abstract class TestAssignmentManagerBase {
@Override
protected RegionOpeningState execOpenRegion(ServerName server, RegionOpenInfo openReq)
throws IOException {
RegionInfo hri = ProtobufUtil.toRegionInfo(openReq.getRegion());
long previousOpenSeqNum =
am.getRegionStates().getOrCreateRegionStateNode(hri).getOpenSeqNum();
sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED,
previousOpenSeqNum + 2);
sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED);
// Concurrency?
// Now update the state of our cluster in regionsToRegionServers.
SortedSet<byte[]> regions = regionsToRegionServers.get(server);
@ -298,6 +294,7 @@ public abstract class TestAssignmentManagerBase {
regions = new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR);
regionsToRegionServers.put(server, regions);
}
RegionInfo hri = ProtobufUtil.toRegionInfo(openReq.getRegion());
if (regions.contains(hri.getRegionName())) {
throw new UnsupportedOperationException(hri.getRegionNameAsString());
}
@ -309,7 +306,7 @@ public abstract class TestAssignmentManagerBase {
protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName)
throws IOException {
RegionInfo hri = am.getRegionInfo(regionName);
sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri), TransitionCode.CLOSED, -1);
sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri), TransitionCode.CLOSED);
return CloseRegionResponse.newBuilder().setClosed(true).build();
}
}
@ -500,18 +497,18 @@ public abstract class TestAssignmentManagerBase {
@Override
protected RegionOpeningState execOpenRegion(final ServerName server, RegionOpenInfo openReq)
throws IOException {
RegionInfo hri = ProtobufUtil.toRegionInfo(openReq.getRegion());
long previousOpenSeqNum =
am.getRegionStates().getOrCreateRegionStateNode(hri).getOpenSeqNum();
switch (rand.nextInt(3)) {
switch (rand.nextInt(6)) {
case 0:
LOG.info("Return OPENED response");
sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED,
previousOpenSeqNum + 2);
sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED);
return OpenRegionResponse.RegionOpeningState.OPENED;
case 1:
LOG.info("Return transition report that OPENED/ALREADY_OPENED response");
sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED);
return OpenRegionResponse.RegionOpeningState.ALREADY_OPENED;
case 2:
LOG.info("Return transition report that FAILED_OPEN/FAILED_OPENING response");
sendTransitionReport(server, openReq.getRegion(), TransitionCode.FAILED_OPEN, -1);
sendTransitionReport(server, openReq.getRegion(), TransitionCode.FAILED_OPEN);
return OpenRegionResponse.RegionOpeningState.FAILED_OPENING;
default:
// fall out
@ -537,7 +534,7 @@ public abstract class TestAssignmentManagerBase {
boolean closed = rand.nextBoolean();
if (closed) {
RegionInfo hri = am.getRegionInfo(regionName);
sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri), TransitionCode.CLOSED, -1);
sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri), TransitionCode.CLOSED);
}
resp.setClosed(closed);
return resp.build();

View File

@ -1,146 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.assignment;
import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_OPENED_VALUE;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.PleaseHoldException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.zookeeper.KeeperException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
@Category({ MasterTests.class, LargeTests.class })
public class TestReportRegionStateTransitionRetry {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestReportRegionStateTransitionRetry.class);
private static final AtomicReference<CountDownLatch> RESUME_AND_FAIL = new AtomicReference<>();
private static final class AssignmentManagerForTest extends AssignmentManager {
public AssignmentManagerForTest(MasterServices master) {
super(master);
}
@Override
public ReportRegionStateTransitionResponse reportRegionStateTransition(
ReportRegionStateTransitionRequest req) throws PleaseHoldException {
ReportRegionStateTransitionResponse resp = super.reportRegionStateTransition(req);
CountDownLatch latch = RESUME_AND_FAIL.getAndSet(null);
if (latch != null) {
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
throw new PleaseHoldException("Inject error");
}
return resp;
}
}
public static final class HMasterForTest extends HMaster {
public HMasterForTest(Configuration conf) throws IOException, KeeperException {
super(conf);
}
@Override
protected AssignmentManager createAssignmentManager(MasterServices master) {
return new AssignmentManagerForTest(master);
}
}
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static TableName NAME = TableName.valueOf("Retry");
private static byte[] CF = Bytes.toBytes("cf");
@BeforeClass
public static void setUp() throws Exception {
UTIL.getConfiguration().setClass(HConstants.MASTER_IMPL, HMasterForTest.class, HMaster.class);
UTIL.startMiniCluster(1);
UTIL.createTable(NAME, CF);
UTIL.waitTableAvailable(NAME);
}
@AfterClass
public static void tearDown() throws Exception {
UTIL.shutdownMiniCluster();
}
@Test
public void testRetryOnClose() throws Exception {
RegionInfo region = UTIL.getMiniHBaseCluster().getRegions(NAME).get(0).getRegionInfo();
ProcedureExecutor<MasterProcedureEnv> procExec =
UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
AssignmentManager am = UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager();
RegionStateNode rsn = am.getRegionStates().getRegionStateNode(region);
CountDownLatch latch = new CountDownLatch(1);
RESUME_AND_FAIL.set(latch);
Future<byte[]> future =
am.moveAsync(new RegionPlan(region, rsn.getRegionLocation(), rsn.getRegionLocation()));
TransitRegionStateProcedure proc =
procExec.getProcedures().stream().filter(p -> p instanceof TransitRegionStateProcedure)
.filter(p -> !p.isFinished()).map(p -> (TransitRegionStateProcedure) p).findAny().get();
// wait until we schedule the OpenRegionProcedure
UTIL.waitFor(10000,
() -> proc.getCurrentStateId() == REGION_STATE_TRANSITION_CONFIRM_OPENED_VALUE);
// Fail the reportRegionStateTransition for closing
latch.countDown();
future.get();
// confirm that the region can still be write
try (Table table = UTIL.getConnection().getTableBuilder(NAME, null).setWriteRpcTimeout(1000)
.setOperationTimeout(2000).build()) {
table.put(
new Put(Bytes.toBytes("key")).addColumn(CF, Bytes.toBytes("cq"), Bytes.toBytes("val")));
}
}
}