HBASE-21248 Implement exponential backoff when retrying for ModifyPeerProcedure
This commit is contained in:
parent
22ac655704
commit
3baafbed52
|
@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
|||
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
|
||||
import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||
|
@ -42,7 +43,10 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
|
||||
|
||||
/**
|
||||
* The base class for all replication peer related procedure except sync replication state
|
||||
|
@ -58,6 +62,8 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
|
|||
// The sleep interval when waiting table to be enabled or disabled.
|
||||
protected static final int SLEEP_INTERVAL_MS = 1000;
|
||||
|
||||
private int attemps;
|
||||
|
||||
protected ModifyPeerProcedure() {
|
||||
}
|
||||
|
||||
|
@ -143,7 +149,9 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
|
|||
}
|
||||
}
|
||||
|
||||
private void reopenRegions(MasterProcedureEnv env) throws IOException {
|
||||
// will be override in test to simulate error
|
||||
@VisibleForTesting
|
||||
protected void reopenRegions(MasterProcedureEnv env) throws IOException {
|
||||
ReplicationPeerConfig peerConfig = getNewPeerConfig();
|
||||
ReplicationPeerConfig oldPeerConfig = getOldPeerConfig();
|
||||
TableStateManager tsm = env.getMasterServices().getTableStateManager();
|
||||
|
@ -165,6 +173,12 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
|
|||
}
|
||||
}
|
||||
|
||||
// will be override in test to simulate error
|
||||
@VisibleForTesting
|
||||
protected void enablePeer(MasterProcedureEnv env) throws ReplicationException {
|
||||
env.getReplicationPeerManager().enablePeer(peerId);
|
||||
}
|
||||
|
||||
private void addToMap(Map<String, Long> lastSeqIds, String encodedRegionName, long barrier,
|
||||
ReplicationQueueStorage queueStorage) throws ReplicationException {
|
||||
if (barrier >= 0) {
|
||||
|
@ -235,9 +249,24 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
|
||||
setState(ProcedureProtos.ProcedureState.RUNNABLE);
|
||||
env.getProcedureScheduler().addFront(this);
|
||||
return false;
|
||||
}
|
||||
|
||||
private ProcedureSuspendedException suspend(long backoff) throws ProcedureSuspendedException {
|
||||
attemps++;
|
||||
setTimeout(Math.toIntExact(backoff));
|
||||
setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
|
||||
skipPersistence();
|
||||
throw new ProcedureSuspendedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState state)
|
||||
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
|
||||
throws ProcedureSuspendedException {
|
||||
switch (state) {
|
||||
case PRE_PEER_MODIFICATION:
|
||||
try {
|
||||
|
@ -249,20 +278,24 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
|
|||
releaseLatch();
|
||||
return Flow.NO_MORE_STATE;
|
||||
} catch (ReplicationException e) {
|
||||
LOG.warn("{} failed to call prePeerModification for peer {}, retry", getClass().getName(),
|
||||
peerId, e);
|
||||
throw new ProcedureYieldException();
|
||||
long backoff = ProcedureUtil.getBackoffTimeMs(attemps);
|
||||
LOG.warn("{} failed to call prePeerModification for peer {}, sleep {} secs",
|
||||
getClass().getName(), peerId, backoff / 1000, e);
|
||||
throw suspend(backoff);
|
||||
}
|
||||
attemps = 0;
|
||||
setNextState(PeerModificationState.UPDATE_PEER_STORAGE);
|
||||
return Flow.HAS_MORE_STATE;
|
||||
case UPDATE_PEER_STORAGE:
|
||||
try {
|
||||
updatePeerStorage(env);
|
||||
} catch (ReplicationException e) {
|
||||
LOG.warn("{} update peer storage for peer {} failed, retry", getClass().getName(), peerId,
|
||||
e);
|
||||
throw new ProcedureYieldException();
|
||||
long backoff = ProcedureUtil.getBackoffTimeMs(attemps);
|
||||
LOG.warn("{} update peer storage for peer {} failed, sleep {} secs", getClass().getName(),
|
||||
peerId, backoff / 1000, e);
|
||||
throw suspend(backoff);
|
||||
}
|
||||
attemps = 0;
|
||||
setNextState(PeerModificationState.REFRESH_PEER_ON_RS);
|
||||
return Flow.HAS_MORE_STATE;
|
||||
case REFRESH_PEER_ON_RS:
|
||||
|
@ -273,30 +306,37 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
|
|||
try {
|
||||
reopenRegions(env);
|
||||
} catch (Exception e) {
|
||||
LOG.warn("{} reopen regions for peer {} failed, retry", getClass().getName(), peerId, e);
|
||||
throw new ProcedureYieldException();
|
||||
long backoff = ProcedureUtil.getBackoffTimeMs(attemps);
|
||||
LOG.warn("{} reopen regions for peer {} failed, sleep {} secs", getClass().getName(),
|
||||
peerId, backoff / 1000, e);
|
||||
throw suspend(backoff);
|
||||
}
|
||||
attemps = 0;
|
||||
setNextState(PeerModificationState.SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID);
|
||||
return Flow.HAS_MORE_STATE;
|
||||
case SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID:
|
||||
try {
|
||||
updateLastPushedSequenceIdForSerialPeer(env);
|
||||
} catch (Exception e) {
|
||||
LOG.warn("{} set last sequence id for peer {} failed, retry", getClass().getName(),
|
||||
peerId, e);
|
||||
throw new ProcedureYieldException();
|
||||
long backoff = ProcedureUtil.getBackoffTimeMs(attemps);
|
||||
LOG.warn("{} set last sequence id for peer {} failed, sleep {} secs",
|
||||
getClass().getName(), peerId, backoff / 1000, e);
|
||||
throw suspend(backoff);
|
||||
}
|
||||
attemps = 0;
|
||||
setNextState(enablePeerBeforeFinish() ? PeerModificationState.SERIAL_PEER_SET_PEER_ENABLED
|
||||
: PeerModificationState.POST_PEER_MODIFICATION);
|
||||
return Flow.HAS_MORE_STATE;
|
||||
case SERIAL_PEER_SET_PEER_ENABLED:
|
||||
try {
|
||||
env.getReplicationPeerManager().enablePeer(peerId);
|
||||
enablePeer(env);
|
||||
} catch (ReplicationException e) {
|
||||
LOG.warn("{} enable peer before finish for peer {} failed, retry", getClass().getName(),
|
||||
peerId, e);
|
||||
throw new ProcedureYieldException();
|
||||
long backoff = ProcedureUtil.getBackoffTimeMs(attemps);
|
||||
LOG.warn("{} enable peer before finish for peer {} failed, sleep {} secs",
|
||||
getClass().getName(), peerId, backoff / 1000, e);
|
||||
throw suspend(backoff);
|
||||
}
|
||||
attemps = 0;
|
||||
setNextState(PeerModificationState.SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS);
|
||||
return Flow.HAS_MORE_STATE;
|
||||
case SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS:
|
||||
|
@ -307,9 +347,10 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
|
|||
try {
|
||||
postPeerModification(env);
|
||||
} catch (ReplicationException e) {
|
||||
LOG.warn("{} failed to call postPeerModification for peer {}, retry",
|
||||
getClass().getName(), peerId, e);
|
||||
throw new ProcedureYieldException();
|
||||
long backoff = ProcedureUtil.getBackoffTimeMs(attemps);
|
||||
LOG.warn("{} failed to call postPeerModification for peer {}, sleep {} secs",
|
||||
getClass().getName(), peerId, backoff / 1000, e);
|
||||
throw suspend(backoff);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("{} failed to call post CP hook for peer {}, " +
|
||||
"ignore since the procedure has already done", getClass().getName(), peerId, e);
|
||||
|
|
|
@ -0,0 +1,85 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Iterator;
|
||||
import java.util.Optional;
|
||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.gson.JsonArray;
|
||||
import org.apache.hbase.thirdparty.com.google.gson.JsonElement;
|
||||
import org.apache.hbase.thirdparty.com.google.gson.JsonObject;
|
||||
import org.apache.hbase.thirdparty.com.google.gson.JsonParser;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
|
||||
|
||||
public final class ProcedureTestUtil {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ProcedureTestUtil.class);
|
||||
|
||||
private ProcedureTestUtil() {
|
||||
}
|
||||
|
||||
private static Optional<JsonObject> getProcedure(HBaseTestingUtility util,
|
||||
Class<? extends Procedure<?>> clazz, JsonParser parser) throws IOException {
|
||||
JsonArray array = parser.parse(util.getAdmin().getProcedures()).getAsJsonArray();
|
||||
Iterator<JsonElement> iterator = array.iterator();
|
||||
while (iterator.hasNext()) {
|
||||
JsonElement element = iterator.next();
|
||||
JsonObject obj = element.getAsJsonObject();
|
||||
String className = obj.get("className").getAsString();
|
||||
if (className.equals(clazz.getName())) {
|
||||
return Optional.of(obj);
|
||||
}
|
||||
}
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
public static void waitUntilProcedureWaitingTimeout(HBaseTestingUtility util,
|
||||
Class<? extends Procedure<?>> clazz, long timeout) throws IOException {
|
||||
JsonParser parser = new JsonParser();
|
||||
util.waitFor(timeout,
|
||||
() -> getProcedure(util, clazz, parser)
|
||||
.filter(o -> ProcedureState.WAITING_TIMEOUT.name().equals(o.get("state").getAsString()))
|
||||
.isPresent());
|
||||
}
|
||||
|
||||
public static void waitUntilProcedureTimeoutIncrease(HBaseTestingUtility util,
|
||||
Class<? extends Procedure<?>> clazz, int times) throws IOException, InterruptedException {
|
||||
JsonParser parser = new JsonParser();
|
||||
long oldTimeout = 0;
|
||||
int timeoutIncrements = 0;
|
||||
for (;;) {
|
||||
long timeout = getProcedure(util, clazz, parser).filter(o -> o.has("timeout"))
|
||||
.map(o -> o.get("timeout").getAsLong()).orElse(-1L);
|
||||
if (timeout > oldTimeout) {
|
||||
LOG.info("Timeout incremented, was {}, now is {}, increments={}", timeout, oldTimeout,
|
||||
timeoutIncrements);
|
||||
oldTimeout = timeout;
|
||||
timeoutIncrements++;
|
||||
if (timeoutIncrements > times) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -18,10 +18,10 @@
|
|||
package org.apache.hadoop.hbase.master.assignment;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Iterator;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.ProcedureTestUtil;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
|
@ -45,13 +45,6 @@ import org.junit.BeforeClass;
|
|||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.gson.JsonArray;
|
||||
import org.apache.hbase.thirdparty.com.google.gson.JsonElement;
|
||||
import org.apache.hbase.thirdparty.com.google.gson.JsonObject;
|
||||
import org.apache.hbase.thirdparty.com.google.gson.JsonParser;
|
||||
|
||||
/**
|
||||
* Confirm that we will do backoff when retrying on closing a region, to avoid consuming all the
|
||||
|
@ -64,8 +57,6 @@ public class TestCloseRegionWhileRSCrash {
|
|||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestCloseRegionWhileRSCrash.class);
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestCloseRegionWhileRSCrash.class);
|
||||
|
||||
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
|
||||
private static TableName TABLE_NAME = TableName.valueOf("Backoff");
|
||||
|
@ -189,25 +180,11 @@ public class TestCloseRegionWhileRSCrash {
|
|||
}
|
||||
});
|
||||
t.start();
|
||||
JsonParser parser = new JsonParser();
|
||||
long oldTimeout = 0;
|
||||
int timeoutIncrements = 0;
|
||||
// wait until we enter the WAITING_TIMEOUT state
|
||||
UTIL.waitFor(30000, () -> getTimeout(parser, UTIL.getAdmin().getProcedures()) > 0);
|
||||
while (true) {
|
||||
long timeout = getTimeout(parser, UTIL.getAdmin().getProcedures());
|
||||
if (timeout > oldTimeout) {
|
||||
LOG.info("Timeout incremented, was {}, now is {}, increments={}", timeout, oldTimeout,
|
||||
timeoutIncrements);
|
||||
oldTimeout = timeout;
|
||||
timeoutIncrements++;
|
||||
if (timeoutIncrements > 3) {
|
||||
// If we incremented at least twice, break; the backoff is working.
|
||||
break;
|
||||
}
|
||||
}
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
ProcedureTestUtil.waitUntilProcedureWaitingTimeout(UTIL, TransitRegionStateProcedure.class,
|
||||
30000);
|
||||
// wait until the timeout value increase three times
|
||||
ProcedureTestUtil.waitUntilProcedureTimeoutIncrease(UTIL, TransitRegionStateProcedure.class, 3);
|
||||
// let's close the connection to make sure that the SCP can not update meta successfully
|
||||
UTIL.getMiniHBaseCluster().getMaster().getConnection().close();
|
||||
RESUME.countDown();
|
||||
|
@ -223,24 +200,4 @@ public class TestCloseRegionWhileRSCrash {
|
|||
table.put(new Put(Bytes.toBytes(1)).addColumn(CF, Bytes.toBytes("cq"), Bytes.toBytes(1)));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param proceduresAsJSON This is String returned by admin.getProcedures call... an array of
|
||||
* Procedures as JSON.
|
||||
* @return The Procedure timeout value parsed from the TRSP.
|
||||
*/
|
||||
private long getTimeout(JsonParser parser, String proceduresAsJSON) {
|
||||
JsonArray array = parser.parse(proceduresAsJSON).getAsJsonArray();
|
||||
Iterator<JsonElement> iterator = array.iterator();
|
||||
while (iterator.hasNext()) {
|
||||
JsonElement element = iterator.next();
|
||||
JsonObject obj = element.getAsJsonObject();
|
||||
String className = obj.get("className").getAsString();
|
||||
String actualClassName = TransitRegionStateProcedure.class.getName();
|
||||
if (className.equals(actualClassName) && obj.has("timeout")) {
|
||||
return obj.get("timeout").getAsLong();
|
||||
}
|
||||
}
|
||||
return -1L;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,166 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.replication;
|
||||
|
||||
import java.io.IOException;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.ProcedureTestUtil;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState;
|
||||
|
||||
@Category({ MasterTests.class, LargeTests.class })
|
||||
public class TestModifyPeerProcedureRetryBackoff {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestModifyPeerProcedureRetryBackoff.class);
|
||||
|
||||
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
|
||||
private static boolean FAIL = true;
|
||||
|
||||
public static class TestModifyPeerProcedure extends ModifyPeerProcedure {
|
||||
|
||||
public TestModifyPeerProcedure() {
|
||||
}
|
||||
|
||||
public TestModifyPeerProcedure(String peerId) {
|
||||
super(peerId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public PeerOperationType getPeerOperationType() {
|
||||
return PeerOperationType.ADD;
|
||||
}
|
||||
|
||||
private void tryFail() throws ReplicationException {
|
||||
synchronized (TestModifyPeerProcedureRetryBackoff.class) {
|
||||
if (FAIL) {
|
||||
throw new ReplicationException("Inject error");
|
||||
}
|
||||
FAIL = true;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <T extends Procedure<MasterProcedureEnv>> void addChildProcedure(
|
||||
@SuppressWarnings("unchecked") T... subProcedure) {
|
||||
// Make it a no-op
|
||||
}
|
||||
|
||||
@Override
|
||||
protected PeerModificationState nextStateAfterRefresh() {
|
||||
return PeerModificationState.SERIAL_PEER_REOPEN_REGIONS;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean enablePeerBeforeFinish() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void updateLastPushedSequenceIdForSerialPeer(MasterProcedureEnv env)
|
||||
throws IOException, ReplicationException {
|
||||
tryFail();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void reopenRegions(MasterProcedureEnv env) throws IOException {
|
||||
try {
|
||||
tryFail();
|
||||
} catch (ReplicationException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void enablePeer(MasterProcedureEnv env) throws ReplicationException {
|
||||
tryFail();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void prePeerModification(MasterProcedureEnv env)
|
||||
throws IOException, ReplicationException {
|
||||
tryFail();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException {
|
||||
tryFail();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void postPeerModification(MasterProcedureEnv env)
|
||||
throws IOException, ReplicationException {
|
||||
tryFail();
|
||||
}
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() throws Exception {
|
||||
UTIL.startMiniCluster(1);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws Exception {
|
||||
UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
private void assertBackoffIncrease() throws IOException, InterruptedException {
|
||||
ProcedureTestUtil.waitUntilProcedureWaitingTimeout(UTIL, TestModifyPeerProcedure.class, 30000);
|
||||
ProcedureTestUtil.waitUntilProcedureTimeoutIncrease(UTIL, TestModifyPeerProcedure.class, 2);
|
||||
synchronized (TestModifyPeerProcedureRetryBackoff.class) {
|
||||
FAIL = false;
|
||||
}
|
||||
UTIL.waitFor(30000, () -> FAIL);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test() throws IOException, InterruptedException {
|
||||
ProcedureExecutor<MasterProcedureEnv> procExec =
|
||||
UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
|
||||
long procId = procExec.submitProcedure(new TestModifyPeerProcedure("1"));
|
||||
// PRE_PEER_MODIFICATION
|
||||
assertBackoffIncrease();
|
||||
// UPDATE_PEER_STORAGE
|
||||
assertBackoffIncrease();
|
||||
// No retry for REFRESH_PEER_ON_RS
|
||||
// SERIAL_PEER_REOPEN_REGIONS
|
||||
assertBackoffIncrease();
|
||||
// SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID
|
||||
assertBackoffIncrease();
|
||||
// SERIAL_PEER_SET_PEER_ENABLED
|
||||
assertBackoffIncrease();
|
||||
// No retry for SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS
|
||||
// POST_PEER_MODIFICATION
|
||||
assertBackoffIncrease();
|
||||
UTIL.waitFor(30000, () -> procExec.isFinished(procId));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue