From 3baafbed52413de48c55d86a7cc0275969902e35 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Fri, 28 Sep 2018 20:20:29 +0800 Subject: [PATCH] HBASE-21248 Implement exponential backoff when retrying for ModifyPeerProcedure --- .../replication/ModifyPeerProcedure.java | 81 ++++++--- .../hadoop/hbase/ProcedureTestUtil.java | 85 +++++++++ .../TestCloseRegionWhileRSCrash.java | 53 +----- .../TestModifyPeerProcedureRetryBackoff.java | 166 ++++++++++++++++++ 4 files changed, 317 insertions(+), 68 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/ProcedureTestUtil.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestModifyPeerProcedureRetryBackoff.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java index ad4df610e1f..9add1f2096c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureUtil; import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; @@ -42,7 +43,10 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; + import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; /** * The base class for all replication peer related procedure except sync replication state @@ -58,6 +62,8 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure lastSeqIds, String encodedRegionName, long barrier, ReplicationQueueStorage queueStorage) throws ReplicationException { if (barrier >= 0) { @@ -235,9 +249,24 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure getProcedure(HBaseTestingUtility util, + Class> clazz, JsonParser parser) throws IOException { + JsonArray array = parser.parse(util.getAdmin().getProcedures()).getAsJsonArray(); + Iterator iterator = array.iterator(); + while (iterator.hasNext()) { + JsonElement element = iterator.next(); + JsonObject obj = element.getAsJsonObject(); + String className = obj.get("className").getAsString(); + if (className.equals(clazz.getName())) { + return Optional.of(obj); + } + } + return Optional.empty(); + } + + public static void waitUntilProcedureWaitingTimeout(HBaseTestingUtility util, + Class> clazz, long timeout) throws IOException { + JsonParser parser = new JsonParser(); + util.waitFor(timeout, + () -> getProcedure(util, clazz, parser) + .filter(o -> ProcedureState.WAITING_TIMEOUT.name().equals(o.get("state").getAsString())) + .isPresent()); + } + + public static void waitUntilProcedureTimeoutIncrease(HBaseTestingUtility util, + Class> clazz, int times) throws IOException, InterruptedException { + JsonParser parser = new JsonParser(); + long oldTimeout = 0; + int timeoutIncrements = 0; + for (;;) { + long timeout = getProcedure(util, clazz, parser).filter(o -> o.has("timeout")) + .map(o -> o.get("timeout").getAsLong()).orElse(-1L); + if (timeout > oldTimeout) { + LOG.info("Timeout incremented, was {}, now is {}, increments={}", timeout, oldTimeout, + timeoutIncrements); + oldTimeout = timeout; + timeoutIncrements++; + if (timeoutIncrements > times) { + break; + } + } + Thread.sleep(1000); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestCloseRegionWhileRSCrash.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestCloseRegionWhileRSCrash.java index 3573bd661e9..d34bfbb4f2c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestCloseRegionWhileRSCrash.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestCloseRegionWhileRSCrash.java @@ -18,10 +18,10 @@ package org.apache.hadoop.hbase.master.assignment; import java.io.IOException; -import java.util.Iterator; import java.util.concurrent.CountDownLatch; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.ProcedureTestUtil; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Put; @@ -45,13 +45,6 @@ import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hbase.thirdparty.com.google.gson.JsonArray; -import org.apache.hbase.thirdparty.com.google.gson.JsonElement; -import org.apache.hbase.thirdparty.com.google.gson.JsonObject; -import org.apache.hbase.thirdparty.com.google.gson.JsonParser; /** * Confirm that we will do backoff when retrying on closing a region, to avoid consuming all the @@ -64,8 +57,6 @@ public class TestCloseRegionWhileRSCrash { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestCloseRegionWhileRSCrash.class); - private static final Logger LOG = LoggerFactory.getLogger(TestCloseRegionWhileRSCrash.class); - private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); private static TableName TABLE_NAME = TableName.valueOf("Backoff"); @@ -189,25 +180,11 @@ public class TestCloseRegionWhileRSCrash { } }); t.start(); - JsonParser parser = new JsonParser(); - long oldTimeout = 0; - int timeoutIncrements = 0; // wait until we enter the WAITING_TIMEOUT state - UTIL.waitFor(30000, () -> getTimeout(parser, UTIL.getAdmin().getProcedures()) > 0); - while (true) { - long timeout = getTimeout(parser, UTIL.getAdmin().getProcedures()); - if (timeout > oldTimeout) { - LOG.info("Timeout incremented, was {}, now is {}, increments={}", timeout, oldTimeout, - timeoutIncrements); - oldTimeout = timeout; - timeoutIncrements++; - if (timeoutIncrements > 3) { - // If we incremented at least twice, break; the backoff is working. - break; - } - } - Thread.sleep(1000); - } + ProcedureTestUtil.waitUntilProcedureWaitingTimeout(UTIL, TransitRegionStateProcedure.class, + 30000); + // wait until the timeout value increase three times + ProcedureTestUtil.waitUntilProcedureTimeoutIncrease(UTIL, TransitRegionStateProcedure.class, 3); // let's close the connection to make sure that the SCP can not update meta successfully UTIL.getMiniHBaseCluster().getMaster().getConnection().close(); RESUME.countDown(); @@ -223,24 +200,4 @@ public class TestCloseRegionWhileRSCrash { table.put(new Put(Bytes.toBytes(1)).addColumn(CF, Bytes.toBytes("cq"), Bytes.toBytes(1))); } } - - /** - * @param proceduresAsJSON This is String returned by admin.getProcedures call... an array of - * Procedures as JSON. - * @return The Procedure timeout value parsed from the TRSP. - */ - private long getTimeout(JsonParser parser, String proceduresAsJSON) { - JsonArray array = parser.parse(proceduresAsJSON).getAsJsonArray(); - Iterator iterator = array.iterator(); - while (iterator.hasNext()) { - JsonElement element = iterator.next(); - JsonObject obj = element.getAsJsonObject(); - String className = obj.get("className").getAsString(); - String actualClassName = TransitRegionStateProcedure.class.getName(); - if (className.equals(actualClassName) && obj.has("timeout")) { - return obj.get("timeout").getAsLong(); - } - } - return -1L; - } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestModifyPeerProcedureRetryBackoff.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestModifyPeerProcedureRetryBackoff.java new file mode 100644 index 00000000000..7566d28e810 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestModifyPeerProcedureRetryBackoff.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.replication; + +import java.io.IOException; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.ProcedureTestUtil; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState; + +@Category({ MasterTests.class, LargeTests.class }) +public class TestModifyPeerProcedureRetryBackoff { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestModifyPeerProcedureRetryBackoff.class); + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static boolean FAIL = true; + + public static class TestModifyPeerProcedure extends ModifyPeerProcedure { + + public TestModifyPeerProcedure() { + } + + public TestModifyPeerProcedure(String peerId) { + super(peerId); + } + + @Override + public PeerOperationType getPeerOperationType() { + return PeerOperationType.ADD; + } + + private void tryFail() throws ReplicationException { + synchronized (TestModifyPeerProcedureRetryBackoff.class) { + if (FAIL) { + throw new ReplicationException("Inject error"); + } + FAIL = true; + } + } + + @Override + protected > void addChildProcedure( + @SuppressWarnings("unchecked") T... subProcedure) { + // Make it a no-op + } + + @Override + protected PeerModificationState nextStateAfterRefresh() { + return PeerModificationState.SERIAL_PEER_REOPEN_REGIONS; + } + + @Override + protected boolean enablePeerBeforeFinish() { + return true; + } + + @Override + protected void updateLastPushedSequenceIdForSerialPeer(MasterProcedureEnv env) + throws IOException, ReplicationException { + tryFail(); + } + + @Override + protected void reopenRegions(MasterProcedureEnv env) throws IOException { + try { + tryFail(); + } catch (ReplicationException e) { + throw new IOException(e); + } + } + + @Override + protected void enablePeer(MasterProcedureEnv env) throws ReplicationException { + tryFail(); + } + + @Override + protected void prePeerModification(MasterProcedureEnv env) + throws IOException, ReplicationException { + tryFail(); + } + + @Override + protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException { + tryFail(); + } + + @Override + protected void postPeerModification(MasterProcedureEnv env) + throws IOException, ReplicationException { + tryFail(); + } + } + + @BeforeClass + public static void setUp() throws Exception { + UTIL.startMiniCluster(1); + } + + @AfterClass + public static void tearDown() throws Exception { + UTIL.shutdownMiniCluster(); + } + + private void assertBackoffIncrease() throws IOException, InterruptedException { + ProcedureTestUtil.waitUntilProcedureWaitingTimeout(UTIL, TestModifyPeerProcedure.class, 30000); + ProcedureTestUtil.waitUntilProcedureTimeoutIncrease(UTIL, TestModifyPeerProcedure.class, 2); + synchronized (TestModifyPeerProcedureRetryBackoff.class) { + FAIL = false; + } + UTIL.waitFor(30000, () -> FAIL); + } + + @Test + public void test() throws IOException, InterruptedException { + ProcedureExecutor procExec = + UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor(); + long procId = procExec.submitProcedure(new TestModifyPeerProcedure("1")); + // PRE_PEER_MODIFICATION + assertBackoffIncrease(); + // UPDATE_PEER_STORAGE + assertBackoffIncrease(); + // No retry for REFRESH_PEER_ON_RS + // SERIAL_PEER_REOPEN_REGIONS + assertBackoffIncrease(); + // SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID + assertBackoffIncrease(); + // SERIAL_PEER_SET_PEER_ENABLED + assertBackoffIncrease(); + // No retry for SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS + // POST_PEER_MODIFICATION + assertBackoffIncrease(); + UTIL.waitFor(30000, () -> procExec.isFinished(procId)); + } +}