diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/TransitRegionStateProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/TransitRegionStateProcedure.java index 2bfe1aff503..df1503ba5df 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/TransitRegionStateProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/TransitRegionStateProcedure.java @@ -17,9 +17,6 @@ */ package org.apache.hadoop.hbase.master.assignment; -import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_CLOSED; -import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_OPENED; - import edu.umd.cs.findbugs.annotations.Nullable; import java.io.IOException; import org.apache.hadoop.hbase.HBaseIOException; @@ -43,7 +40,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; - import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionStateTransitionState; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionStateTransitionStateData; @@ -350,129 +346,71 @@ public class TransitRegionStateProcedure return false; // 'false' means that this procedure handled the timeout } - private boolean isOpening(RegionStateNode regionNode, ServerName serverName, - TransitionCode code) { - if (!regionNode.isInState(State.OPENING)) { - LOG.warn("Received report {} transition from {}, pid={}, but the region {} is not in" + - " OPENING state, should be a retry, ignore", code, serverName, getProcId(), regionNode); - return false; + private void reportTransitionOpened(MasterProcedureEnv env, RegionStateNode regionNode, + ServerName serverName, TransitionCode code, long openSeqNum) throws IOException { + switch (code) { + case OPENED: + if (openSeqNum < 0) { + throw new UnexpectedStateException("Received report unexpected " + code + + " transition openSeqNum=" + openSeqNum + ", " + regionNode); + } + if (openSeqNum <= regionNode.getOpenSeqNum()) { + if (openSeqNum != 0) { + LOG.warn("Skip update of openSeqNum for {} with {} because the currentSeqNum={}", + regionNode, openSeqNum, regionNode.getOpenSeqNum()); + } + } else { + regionNode.setOpenSeqNum(openSeqNum); + } + env.getAssignmentManager().regionOpened(regionNode); + if (lastState == RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_OPENED) { + // we are done + regionNode.unsetProcedure(this); + } + regionNode.getProcedureEvent().wake(env.getProcedureScheduler()); + break; + case FAILED_OPEN: + // just wake up the procedure and see if we can retry + regionNode.getProcedureEvent().wake(env.getProcedureScheduler()); + break; + default: + throw new UnexpectedStateException( + "Received report unexpected " + code + " transition openSeqNum=" + openSeqNum + ", " + + regionNode.toShortString() + ", " + this + ", expected OPENED or FAILED_OPEN."); } - if (getCurrentState() != REGION_STATE_TRANSITION_CONFIRM_OPENED) { - LOG.warn( - "Received report {} transition from {}, pid={}, but the TRSP is not in {} state," + - " should be a retry, ignore", - code, serverName, getProcId(), REGION_STATE_TRANSITION_CONFIRM_OPENED); - return false; - } - return true; - } - - private void reportTransitionOpen(MasterProcedureEnv env, RegionStateNode regionNode, - ServerName serverName, long openSeqNum) throws IOException { - if (!isOpening(regionNode, serverName, TransitionCode.OPENED)) { - return; - } - if (openSeqNum < 0) { - throw new UnexpectedStateException("Received report unexpected " + TransitionCode.OPENED + - " transition openSeqNum=" + openSeqNum + ", " + regionNode + ", proc=" + this); - } - if (openSeqNum <= regionNode.getOpenSeqNum()) { - // use the openSeqNum as a fence, if this is not a retry, then the openSeqNum should be - // greater than the existing one. - if (openSeqNum != 0) { - LOG.warn("Skip update of region state for {} with openSeqNum={}, pid={} because the" + - " currentSeqNum={}", regionNode, openSeqNum, getProcId(), regionNode.getOpenSeqNum()); - return; - } - } else { - regionNode.setOpenSeqNum(openSeqNum); - } - env.getAssignmentManager().regionOpened(regionNode); - if (lastState == RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_OPENED) { - // we are done - regionNode.unsetProcedure(this); - } - regionNode.getProcedureEvent().wake(env.getProcedureScheduler()); - } - - private void reportTransitionFailedOpen(MasterProcedureEnv env, RegionStateNode regionNode, - ServerName serverName) { - if (!isOpening(regionNode, serverName, TransitionCode.FAILED_OPEN)) { - return; - } - // there is no openSeqNum for FAILED_OPEN, so we will check the target server instead - if (!regionNode.getRegionLocation().equals(serverName)) { - LOG.warn( - "Received report {} transition from {}, pid={}, but the region {} is not on it," + - " should be a retry, ignore", - TransitionCode.FAILED_OPEN, serverName, getProcId(), regionNode); - return; - } - // just wake up the procedure and see if we can retry - // Notice that, even if we arrive here, this call could still be a retry, as we may retry - // opening on the same server again. And the assumption here is that, once the region state is - // OPENING, and the TRSP state is REGION_STATE_TRANSITION_CONFIRM_OPENED, the TRSP must have - // been suspended on the procedure event, so after the waking operation here, the TRSP will be - // executed and try to schedule new OpenRegionProcedure again. Once there is a successful open - // then we are done, so the TRSP will not be stuck. - // TODO: maybe we could send the procedure id of the OpenRegionProcedure to the region server - // and let the region server send it back when done, so it will be easy to detect whether this - // is a retry. - regionNode.getProcedureEvent().wake(env.getProcedureScheduler()); } // we do not need seqId for closing a region private void reportTransitionClosed(MasterProcedureEnv env, RegionStateNode regionNode, - ServerName serverName) throws IOException { - if (!regionNode.isInState(State.CLOSING)) { - LOG.warn( - "Received report {} transition from {}, pid={}, but the region {} is not in" + - " CLOSING state, should be a retry, ignore", - TransitionCode.CLOSED, serverName, getProcId(), regionNode); - return; + ServerName serverName, TransitionCode code) throws IOException { + switch (code) { + case CLOSED: + env.getAssignmentManager().regionClosed(regionNode, true); + if (lastState == RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_CLOSED) { + // we are done + regionNode.unsetProcedure(this); + } + regionNode.getProcedureEvent().wake(env.getProcedureScheduler()); + break; + default: + throw new UnexpectedStateException("Received report unexpected " + code + " transition, " + + regionNode.toShortString() + ", " + this + ", expected CLOSED."); } - if (getCurrentState() != REGION_STATE_TRANSITION_CONFIRM_CLOSED) { - LOG.warn( - "Received report {} transition from {}, pid={} but the proc is not in {}" + - " state, should be a retry, ignore", - TransitionCode.CLOSED, serverName, getProcId(), REGION_STATE_TRANSITION_CONFIRM_CLOSED); - return; - } - if (!regionNode.getRegionLocation().equals(serverName)) { - LOG.warn( - "Received report {} transition from {}, pid={}, but the region {} is not on it," + - " should be a retry, ignore", - TransitionCode.CLOSED, serverName, getProcId(), regionNode); - return; - } - env.getAssignmentManager().regionClosed(regionNode, true); - if (lastState == RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_CLOSED) { - // we are done - regionNode.unsetProcedure(this); - } - regionNode.getProcedureEvent().wake(env.getProcedureScheduler()); } // Should be called with RegionStateNode locked public void reportTransition(MasterProcedureEnv env, RegionStateNode regionNode, ServerName serverName, TransitionCode code, long seqId) throws IOException { - // It is possible that the previous reportRegionStateTransition call was succeeded at master - // side, but before returning the result to region server, the rpc connection was broken, or the - // master restarted. The region server will try calling reportRegionStateTransition again under - // this scenario, so here we need to check whether this is a retry. - switch (code) { - case OPENED: - reportTransitionOpen(env, regionNode, serverName, seqId); + switch (getCurrentState()) { + case REGION_STATE_TRANSITION_CONFIRM_OPENED: + reportTransitionOpened(env, regionNode, serverName, code, seqId); break; - case FAILED_OPEN: - reportTransitionFailedOpen(env, regionNode, serverName); - break; - case CLOSED: - reportTransitionClosed(env, regionNode, serverName); + case REGION_STATE_TRANSITION_CONFIRM_CLOSED: + reportTransitionClosed(env, regionNode, serverName, code); break; default: - throw new UnexpectedStateException("Received report unexpected " + code + " transition, " + - regionNode.toShortString() + ", " + this + ", expected OPENED or FAILED_OPEN or CLOSED."); + LOG.warn("{} received unexpected report transition call from {}, code={}, seqId={}", this, + serverName, code, seqId); } } @@ -540,8 +478,9 @@ public class TransitRegionStateProcedure @Override protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { super.serializeStateData(serializer); - RegionStateTransitionStateData.Builder builder = RegionStateTransitionStateData.newBuilder() - .setInitialState(initialState).setLastState(lastState).setForceNewPlan(forceNewPlan); + RegionStateTransitionStateData.Builder builder = + RegionStateTransitionStateData.newBuilder().setInitialState(initialState) + .setLastState(lastState).setForceNewPlan(forceNewPlan); if (assignCandidate != null) { builder.setAssignCandidate(ProtobufUtil.toServerName(assignCandidate)); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManagerBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManagerBase.java index 5f5a5766d28..1c97f372227 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManagerBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManagerBase.java @@ -261,12 +261,12 @@ public abstract class TestAssignmentManagerBase { protected void sendTransitionReport(final ServerName serverName, final org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo regionInfo, - final TransitionCode state, long seqId) throws IOException { + final TransitionCode state) throws IOException { ReportRegionStateTransitionRequest.Builder req = ReportRegionStateTransitionRequest.newBuilder(); req.setServer(ProtobufUtil.toServerName(serverName)); req.addTransition(RegionStateTransition.newBuilder().addRegionInfo(regionInfo) - .setTransitionCode(state).setOpenSeqNum(seqId).build()); + .setTransitionCode(state).setOpenSeqNum(1).build()); am.reportRegionStateTransition(req.build()); } @@ -286,11 +286,7 @@ public abstract class TestAssignmentManagerBase { @Override protected RegionOpeningState execOpenRegion(ServerName server, RegionOpenInfo openReq) throws IOException { - RegionInfo hri = ProtobufUtil.toRegionInfo(openReq.getRegion()); - long previousOpenSeqNum = - am.getRegionStates().getOrCreateRegionStateNode(hri).getOpenSeqNum(); - sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED, - previousOpenSeqNum + 2); + sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED); // Concurrency? // Now update the state of our cluster in regionsToRegionServers. SortedSet regions = regionsToRegionServers.get(server); @@ -298,6 +294,7 @@ public abstract class TestAssignmentManagerBase { regions = new ConcurrentSkipListSet(Bytes.BYTES_COMPARATOR); regionsToRegionServers.put(server, regions); } + RegionInfo hri = ProtobufUtil.toRegionInfo(openReq.getRegion()); if (regions.contains(hri.getRegionName())) { throw new UnsupportedOperationException(hri.getRegionNameAsString()); } @@ -309,7 +306,7 @@ public abstract class TestAssignmentManagerBase { protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName) throws IOException { RegionInfo hri = am.getRegionInfo(regionName); - sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri), TransitionCode.CLOSED, -1); + sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri), TransitionCode.CLOSED); return CloseRegionResponse.newBuilder().setClosed(true).build(); } } @@ -500,18 +497,18 @@ public abstract class TestAssignmentManagerBase { @Override protected RegionOpeningState execOpenRegion(final ServerName server, RegionOpenInfo openReq) throws IOException { - RegionInfo hri = ProtobufUtil.toRegionInfo(openReq.getRegion()); - long previousOpenSeqNum = - am.getRegionStates().getOrCreateRegionStateNode(hri).getOpenSeqNum(); - switch (rand.nextInt(3)) { + switch (rand.nextInt(6)) { case 0: LOG.info("Return OPENED response"); - sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED, - previousOpenSeqNum + 2); + sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED); return OpenRegionResponse.RegionOpeningState.OPENED; case 1: + LOG.info("Return transition report that OPENED/ALREADY_OPENED response"); + sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED); + return OpenRegionResponse.RegionOpeningState.ALREADY_OPENED; + case 2: LOG.info("Return transition report that FAILED_OPEN/FAILED_OPENING response"); - sendTransitionReport(server, openReq.getRegion(), TransitionCode.FAILED_OPEN, -1); + sendTransitionReport(server, openReq.getRegion(), TransitionCode.FAILED_OPEN); return OpenRegionResponse.RegionOpeningState.FAILED_OPENING; default: // fall out @@ -537,7 +534,7 @@ public abstract class TestAssignmentManagerBase { boolean closed = rand.nextBoolean(); if (closed) { RegionInfo hri = am.getRegionInfo(regionName); - sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri), TransitionCode.CLOSED, -1); + sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri), TransitionCode.CLOSED); } resp.setClosed(closed); return resp.build(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestReportRegionStateTransitionRetry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestReportRegionStateTransitionRetry.java deleted file mode 100644 index 7505b58794f..00000000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestReportRegionStateTransitionRetry.java +++ /dev/null @@ -1,146 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.master.assignment; - -import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_OPENED_VALUE; - -import java.io.IOException; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicReference; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.PleaseHoldException; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.RegionInfo; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.master.HMaster; -import org.apache.hadoop.hbase.master.MasterServices; -import org.apache.hadoop.hbase.master.RegionPlan; -import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; -import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; -import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.apache.hadoop.hbase.testclassification.MasterTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.zookeeper.KeeperException; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse; - -@Category({ MasterTests.class, LargeTests.class }) -public class TestReportRegionStateTransitionRetry { - - @ClassRule - public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestReportRegionStateTransitionRetry.class); - - private static final AtomicReference RESUME_AND_FAIL = new AtomicReference<>(); - - private static final class AssignmentManagerForTest extends AssignmentManager { - - public AssignmentManagerForTest(MasterServices master) { - super(master); - } - - @Override - public ReportRegionStateTransitionResponse reportRegionStateTransition( - ReportRegionStateTransitionRequest req) throws PleaseHoldException { - ReportRegionStateTransitionResponse resp = super.reportRegionStateTransition(req); - CountDownLatch latch = RESUME_AND_FAIL.getAndSet(null); - if (latch != null) { - try { - latch.await(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - throw new PleaseHoldException("Inject error"); - } - return resp; - } - } - - public static final class HMasterForTest extends HMaster { - - public HMasterForTest(Configuration conf) throws IOException, KeeperException { - super(conf); - } - - @Override - protected AssignmentManager createAssignmentManager(MasterServices master) { - return new AssignmentManagerForTest(master); - } - } - - private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); - - private static TableName NAME = TableName.valueOf("Retry"); - - private static byte[] CF = Bytes.toBytes("cf"); - - @BeforeClass - public static void setUp() throws Exception { - UTIL.getConfiguration().setClass(HConstants.MASTER_IMPL, HMasterForTest.class, HMaster.class); - UTIL.startMiniCluster(1); - UTIL.createTable(NAME, CF); - UTIL.waitTableAvailable(NAME); - } - - @AfterClass - public static void tearDown() throws Exception { - UTIL.shutdownMiniCluster(); - } - - @Test - public void testRetryOnClose() throws Exception { - RegionInfo region = UTIL.getMiniHBaseCluster().getRegions(NAME).get(0).getRegionInfo(); - ProcedureExecutor procExec = - UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor(); - AssignmentManager am = UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager(); - RegionStateNode rsn = am.getRegionStates().getRegionStateNode(region); - - CountDownLatch latch = new CountDownLatch(1); - RESUME_AND_FAIL.set(latch); - Future future = - am.moveAsync(new RegionPlan(region, rsn.getRegionLocation(), rsn.getRegionLocation())); - TransitRegionStateProcedure proc = - procExec.getProcedures().stream().filter(p -> p instanceof TransitRegionStateProcedure) - .filter(p -> !p.isFinished()).map(p -> (TransitRegionStateProcedure) p).findAny().get(); - - // wait until we schedule the OpenRegionProcedure - UTIL.waitFor(10000, - () -> proc.getCurrentStateId() == REGION_STATE_TRANSITION_CONFIRM_OPENED_VALUE); - // Fail the reportRegionStateTransition for closing - latch.countDown(); - future.get(); - - // confirm that the region can still be write - try (Table table = UTIL.getConnection().getTableBuilder(NAME, null).setWriteRpcTimeout(1000) - .setOperationTimeout(2000).build()) { - table.put( - new Put(Bytes.toBytes("key")).addColumn(CF, Bytes.toBytes("cq"), Bytes.toBytes("val"))); - } - } -}