diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/TransitRegionStateProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/TransitRegionStateProcedure.java index df1503ba5df..2bfe1aff503 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/TransitRegionStateProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/TransitRegionStateProcedure.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hbase.master.assignment; +import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_CLOSED; +import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_OPENED; + import edu.umd.cs.findbugs.annotations.Nullable; import java.io.IOException; import org.apache.hadoop.hbase.HBaseIOException; @@ -40,6 +43,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; + import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionStateTransitionState; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionStateTransitionStateData; @@ -346,71 +350,129 @@ public class TransitRegionStateProcedure return false; // 'false' means that this procedure handled the timeout } - private void reportTransitionOpened(MasterProcedureEnv env, RegionStateNode regionNode, - ServerName serverName, TransitionCode code, long openSeqNum) throws IOException { - switch (code) { - case OPENED: - if (openSeqNum < 0) { - throw new UnexpectedStateException("Received report unexpected " + code + - " transition openSeqNum=" + openSeqNum + ", " + regionNode); - } - if (openSeqNum <= regionNode.getOpenSeqNum()) { - if (openSeqNum != 0) { - LOG.warn("Skip update of openSeqNum for {} with {} because the currentSeqNum={}", - regionNode, openSeqNum, regionNode.getOpenSeqNum()); - } - } else { - regionNode.setOpenSeqNum(openSeqNum); - } - env.getAssignmentManager().regionOpened(regionNode); - if (lastState == RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_OPENED) { - // we are done - regionNode.unsetProcedure(this); - } - regionNode.getProcedureEvent().wake(env.getProcedureScheduler()); - break; - case FAILED_OPEN: - // just wake up the procedure and see if we can retry - regionNode.getProcedureEvent().wake(env.getProcedureScheduler()); - break; - default: - throw new UnexpectedStateException( - "Received report unexpected " + code + " transition openSeqNum=" + openSeqNum + ", " + - regionNode.toShortString() + ", " + this + ", expected OPENED or FAILED_OPEN."); + private boolean isOpening(RegionStateNode regionNode, ServerName serverName, + TransitionCode code) { + if (!regionNode.isInState(State.OPENING)) { + LOG.warn("Received report {} transition from {}, pid={}, but the region {} is not in" + + " OPENING state, should be a retry, ignore", code, serverName, getProcId(), regionNode); + return false; } + if (getCurrentState() != REGION_STATE_TRANSITION_CONFIRM_OPENED) { + LOG.warn( + "Received report {} transition from {}, pid={}, but the TRSP is not in {} state," + + " should be a retry, ignore", + code, serverName, getProcId(), REGION_STATE_TRANSITION_CONFIRM_OPENED); + return false; + } + return true; + } + + private void reportTransitionOpen(MasterProcedureEnv env, RegionStateNode regionNode, + ServerName serverName, long openSeqNum) throws IOException { + if (!isOpening(regionNode, serverName, TransitionCode.OPENED)) { + return; + } + if (openSeqNum < 0) { + throw new UnexpectedStateException("Received report unexpected " + TransitionCode.OPENED + + " transition openSeqNum=" + openSeqNum + ", " + regionNode + ", proc=" + this); + } + if (openSeqNum <= regionNode.getOpenSeqNum()) { + // use the openSeqNum as a fence, if this is not a retry, then the openSeqNum should be + // greater than the existing one. + if (openSeqNum != 0) { + LOG.warn("Skip update of region state for {} with openSeqNum={}, pid={} because the" + + " currentSeqNum={}", regionNode, openSeqNum, getProcId(), regionNode.getOpenSeqNum()); + return; + } + } else { + regionNode.setOpenSeqNum(openSeqNum); + } + env.getAssignmentManager().regionOpened(regionNode); + if (lastState == RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_OPENED) { + // we are done + regionNode.unsetProcedure(this); + } + regionNode.getProcedureEvent().wake(env.getProcedureScheduler()); + } + + private void reportTransitionFailedOpen(MasterProcedureEnv env, RegionStateNode regionNode, + ServerName serverName) { + if (!isOpening(regionNode, serverName, TransitionCode.FAILED_OPEN)) { + return; + } + // there is no openSeqNum for FAILED_OPEN, so we will check the target server instead + if (!regionNode.getRegionLocation().equals(serverName)) { + LOG.warn( + "Received report {} transition from {}, pid={}, but the region {} is not on it," + + " should be a retry, ignore", + TransitionCode.FAILED_OPEN, serverName, getProcId(), regionNode); + return; + } + // just wake up the procedure and see if we can retry + // Notice that, even if we arrive here, this call could still be a retry, as we may retry + // opening on the same server again. And the assumption here is that, once the region state is + // OPENING, and the TRSP state is REGION_STATE_TRANSITION_CONFIRM_OPENED, the TRSP must have + // been suspended on the procedure event, so after the waking operation here, the TRSP will be + // executed and try to schedule new OpenRegionProcedure again. Once there is a successful open + // then we are done, so the TRSP will not be stuck. + // TODO: maybe we could send the procedure id of the OpenRegionProcedure to the region server + // and let the region server send it back when done, so it will be easy to detect whether this + // is a retry. + regionNode.getProcedureEvent().wake(env.getProcedureScheduler()); } // we do not need seqId for closing a region private void reportTransitionClosed(MasterProcedureEnv env, RegionStateNode regionNode, - ServerName serverName, TransitionCode code) throws IOException { - switch (code) { - case CLOSED: - env.getAssignmentManager().regionClosed(regionNode, true); - if (lastState == RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_CLOSED) { - // we are done - regionNode.unsetProcedure(this); - } - regionNode.getProcedureEvent().wake(env.getProcedureScheduler()); - break; - default: - throw new UnexpectedStateException("Received report unexpected " + code + " transition, " + - regionNode.toShortString() + ", " + this + ", expected CLOSED."); + ServerName serverName) throws IOException { + if (!regionNode.isInState(State.CLOSING)) { + LOG.warn( + "Received report {} transition from {}, pid={}, but the region {} is not in" + + " CLOSING state, should be a retry, ignore", + TransitionCode.CLOSED, serverName, getProcId(), regionNode); + return; } + if (getCurrentState() != REGION_STATE_TRANSITION_CONFIRM_CLOSED) { + LOG.warn( + "Received report {} transition from {}, pid={} but the proc is not in {}" + + " state, should be a retry, ignore", + TransitionCode.CLOSED, serverName, getProcId(), REGION_STATE_TRANSITION_CONFIRM_CLOSED); + return; + } + if (!regionNode.getRegionLocation().equals(serverName)) { + LOG.warn( + "Received report {} transition from {}, pid={}, but the region {} is not on it," + + " should be a retry, ignore", + TransitionCode.CLOSED, serverName, getProcId(), regionNode); + return; + } + env.getAssignmentManager().regionClosed(regionNode, true); + if (lastState == RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_CLOSED) { + // we are done + regionNode.unsetProcedure(this); + } + regionNode.getProcedureEvent().wake(env.getProcedureScheduler()); } // Should be called with RegionStateNode locked public void reportTransition(MasterProcedureEnv env, RegionStateNode regionNode, ServerName serverName, TransitionCode code, long seqId) throws IOException { - switch (getCurrentState()) { - case REGION_STATE_TRANSITION_CONFIRM_OPENED: - reportTransitionOpened(env, regionNode, serverName, code, seqId); + // It is possible that the previous reportRegionStateTransition call was succeeded at master + // side, but before returning the result to region server, the rpc connection was broken, or the + // master restarted. The region server will try calling reportRegionStateTransition again under + // this scenario, so here we need to check whether this is a retry. + switch (code) { + case OPENED: + reportTransitionOpen(env, regionNode, serverName, seqId); break; - case REGION_STATE_TRANSITION_CONFIRM_CLOSED: - reportTransitionClosed(env, regionNode, serverName, code); + case FAILED_OPEN: + reportTransitionFailedOpen(env, regionNode, serverName); + break; + case CLOSED: + reportTransitionClosed(env, regionNode, serverName); break; default: - LOG.warn("{} received unexpected report transition call from {}, code={}, seqId={}", this, - serverName, code, seqId); + throw new UnexpectedStateException("Received report unexpected " + code + " transition, " + + regionNode.toShortString() + ", " + this + ", expected OPENED or FAILED_OPEN or CLOSED."); } } @@ -478,9 +540,8 @@ public class TransitRegionStateProcedure @Override protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { super.serializeStateData(serializer); - RegionStateTransitionStateData.Builder builder = - RegionStateTransitionStateData.newBuilder().setInitialState(initialState) - .setLastState(lastState).setForceNewPlan(forceNewPlan); + RegionStateTransitionStateData.Builder builder = RegionStateTransitionStateData.newBuilder() + .setInitialState(initialState).setLastState(lastState).setForceNewPlan(forceNewPlan); if (assignCandidate != null) { builder.setAssignCandidate(ProtobufUtil.toServerName(assignCandidate)); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManagerBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManagerBase.java index 1c97f372227..5f5a5766d28 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManagerBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManagerBase.java @@ -261,12 +261,12 @@ public abstract class TestAssignmentManagerBase { protected void sendTransitionReport(final ServerName serverName, final org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo regionInfo, - final TransitionCode state) throws IOException { + final TransitionCode state, long seqId) throws IOException { ReportRegionStateTransitionRequest.Builder req = ReportRegionStateTransitionRequest.newBuilder(); req.setServer(ProtobufUtil.toServerName(serverName)); req.addTransition(RegionStateTransition.newBuilder().addRegionInfo(regionInfo) - .setTransitionCode(state).setOpenSeqNum(1).build()); + .setTransitionCode(state).setOpenSeqNum(seqId).build()); am.reportRegionStateTransition(req.build()); } @@ -286,7 +286,11 @@ public abstract class TestAssignmentManagerBase { @Override protected RegionOpeningState execOpenRegion(ServerName server, RegionOpenInfo openReq) throws IOException { - sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED); + RegionInfo hri = ProtobufUtil.toRegionInfo(openReq.getRegion()); + long previousOpenSeqNum = + am.getRegionStates().getOrCreateRegionStateNode(hri).getOpenSeqNum(); + sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED, + previousOpenSeqNum + 2); // Concurrency? // Now update the state of our cluster in regionsToRegionServers. SortedSet regions = regionsToRegionServers.get(server); @@ -294,7 +298,6 @@ public abstract class TestAssignmentManagerBase { regions = new ConcurrentSkipListSet(Bytes.BYTES_COMPARATOR); regionsToRegionServers.put(server, regions); } - RegionInfo hri = ProtobufUtil.toRegionInfo(openReq.getRegion()); if (regions.contains(hri.getRegionName())) { throw new UnsupportedOperationException(hri.getRegionNameAsString()); } @@ -306,7 +309,7 @@ public abstract class TestAssignmentManagerBase { protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName) throws IOException { RegionInfo hri = am.getRegionInfo(regionName); - sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri), TransitionCode.CLOSED); + sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri), TransitionCode.CLOSED, -1); return CloseRegionResponse.newBuilder().setClosed(true).build(); } } @@ -497,18 +500,18 @@ public abstract class TestAssignmentManagerBase { @Override protected RegionOpeningState execOpenRegion(final ServerName server, RegionOpenInfo openReq) throws IOException { - switch (rand.nextInt(6)) { + RegionInfo hri = ProtobufUtil.toRegionInfo(openReq.getRegion()); + long previousOpenSeqNum = + am.getRegionStates().getOrCreateRegionStateNode(hri).getOpenSeqNum(); + switch (rand.nextInt(3)) { case 0: LOG.info("Return OPENED response"); - sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED); + sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED, + previousOpenSeqNum + 2); return OpenRegionResponse.RegionOpeningState.OPENED; case 1: - LOG.info("Return transition report that OPENED/ALREADY_OPENED response"); - sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED); - return OpenRegionResponse.RegionOpeningState.ALREADY_OPENED; - case 2: LOG.info("Return transition report that FAILED_OPEN/FAILED_OPENING response"); - sendTransitionReport(server, openReq.getRegion(), TransitionCode.FAILED_OPEN); + sendTransitionReport(server, openReq.getRegion(), TransitionCode.FAILED_OPEN, -1); return OpenRegionResponse.RegionOpeningState.FAILED_OPENING; default: // fall out @@ -534,7 +537,7 @@ public abstract class TestAssignmentManagerBase { boolean closed = rand.nextBoolean(); if (closed) { RegionInfo hri = am.getRegionInfo(regionName); - sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri), TransitionCode.CLOSED); + sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri), TransitionCode.CLOSED, -1); } resp.setClosed(closed); return resp.build(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestReportRegionStateTransitionRetry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestReportRegionStateTransitionRetry.java new file mode 100644 index 00000000000..7505b58794f --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestReportRegionStateTransitionRetry.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.assignment; + +import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_OPENED_VALUE; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.PleaseHoldException; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.RegionPlan; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.zookeeper.KeeperException; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse; + +@Category({ MasterTests.class, LargeTests.class }) +public class TestReportRegionStateTransitionRetry { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestReportRegionStateTransitionRetry.class); + + private static final AtomicReference RESUME_AND_FAIL = new AtomicReference<>(); + + private static final class AssignmentManagerForTest extends AssignmentManager { + + public AssignmentManagerForTest(MasterServices master) { + super(master); + } + + @Override + public ReportRegionStateTransitionResponse reportRegionStateTransition( + ReportRegionStateTransitionRequest req) throws PleaseHoldException { + ReportRegionStateTransitionResponse resp = super.reportRegionStateTransition(req); + CountDownLatch latch = RESUME_AND_FAIL.getAndSet(null); + if (latch != null) { + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + throw new PleaseHoldException("Inject error"); + } + return resp; + } + } + + public static final class HMasterForTest extends HMaster { + + public HMasterForTest(Configuration conf) throws IOException, KeeperException { + super(conf); + } + + @Override + protected AssignmentManager createAssignmentManager(MasterServices master) { + return new AssignmentManagerForTest(master); + } + } + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static TableName NAME = TableName.valueOf("Retry"); + + private static byte[] CF = Bytes.toBytes("cf"); + + @BeforeClass + public static void setUp() throws Exception { + UTIL.getConfiguration().setClass(HConstants.MASTER_IMPL, HMasterForTest.class, HMaster.class); + UTIL.startMiniCluster(1); + UTIL.createTable(NAME, CF); + UTIL.waitTableAvailable(NAME); + } + + @AfterClass + public static void tearDown() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @Test + public void testRetryOnClose() throws Exception { + RegionInfo region = UTIL.getMiniHBaseCluster().getRegions(NAME).get(0).getRegionInfo(); + ProcedureExecutor procExec = + UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor(); + AssignmentManager am = UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager(); + RegionStateNode rsn = am.getRegionStates().getRegionStateNode(region); + + CountDownLatch latch = new CountDownLatch(1); + RESUME_AND_FAIL.set(latch); + Future future = + am.moveAsync(new RegionPlan(region, rsn.getRegionLocation(), rsn.getRegionLocation())); + TransitRegionStateProcedure proc = + procExec.getProcedures().stream().filter(p -> p instanceof TransitRegionStateProcedure) + .filter(p -> !p.isFinished()).map(p -> (TransitRegionStateProcedure) p).findAny().get(); + + // wait until we schedule the OpenRegionProcedure + UTIL.waitFor(10000, + () -> proc.getCurrentStateId() == REGION_STATE_TRANSITION_CONFIRM_OPENED_VALUE); + // Fail the reportRegionStateTransition for closing + latch.countDown(); + future.get(); + + // confirm that the region can still be write + try (Table table = UTIL.getConnection().getTableBuilder(NAME, null).setWriteRpcTimeout(1000) + .setOperationTimeout(2000).build()) { + table.put( + new Put(Bytes.toBytes("key")).addColumn(CF, Bytes.toBytes("cq"), Bytes.toBytes("val"))); + } + } +}