HBASE-21248 Implement exponential backoff when retrying for ModifyPeerProcedure
This commit is contained in:
parent
ab6ec1f9e4
commit
fdbaa4c3f0
|
@ -32,7 +32,7 @@ import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||||
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
|
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
|
||||||
import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure;
|
import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure;
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
|
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
|
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
|
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
|
||||||
|
@ -42,7 +42,10 @@ import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The base class for all replication peer related procedure except sync replication state
|
* The base class for all replication peer related procedure except sync replication state
|
||||||
|
@ -58,6 +61,8 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
|
||||||
// The sleep interval when waiting table to be enabled or disabled.
|
// The sleep interval when waiting table to be enabled or disabled.
|
||||||
protected static final int SLEEP_INTERVAL_MS = 1000;
|
protected static final int SLEEP_INTERVAL_MS = 1000;
|
||||||
|
|
||||||
|
private int attemps;
|
||||||
|
|
||||||
protected ModifyPeerProcedure() {
|
protected ModifyPeerProcedure() {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -143,7 +148,9 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void reopenRegions(MasterProcedureEnv env) throws IOException {
|
// will be override in test to simulate error
|
||||||
|
@VisibleForTesting
|
||||||
|
protected void reopenRegions(MasterProcedureEnv env) throws IOException {
|
||||||
ReplicationPeerConfig peerConfig = getNewPeerConfig();
|
ReplicationPeerConfig peerConfig = getNewPeerConfig();
|
||||||
ReplicationPeerConfig oldPeerConfig = getOldPeerConfig();
|
ReplicationPeerConfig oldPeerConfig = getOldPeerConfig();
|
||||||
TableStateManager tsm = env.getMasterServices().getTableStateManager();
|
TableStateManager tsm = env.getMasterServices().getTableStateManager();
|
||||||
|
@ -165,6 +172,12 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// will be override in test to simulate error
|
||||||
|
@VisibleForTesting
|
||||||
|
protected void enablePeer(MasterProcedureEnv env) throws ReplicationException {
|
||||||
|
env.getReplicationPeerManager().enablePeer(peerId);
|
||||||
|
}
|
||||||
|
|
||||||
private void addToMap(Map<String, Long> lastSeqIds, String encodedRegionName, long barrier,
|
private void addToMap(Map<String, Long> lastSeqIds, String encodedRegionName, long barrier,
|
||||||
ReplicationQueueStorage queueStorage) throws ReplicationException {
|
ReplicationQueueStorage queueStorage) throws ReplicationException {
|
||||||
if (barrier >= 0) {
|
if (barrier >= 0) {
|
||||||
|
@ -235,9 +248,24 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
|
||||||
|
setState(ProcedureProtos.ProcedureState.RUNNABLE);
|
||||||
|
env.getProcedureScheduler().addFront(this);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
private ProcedureSuspendedException suspend(long backoff) throws ProcedureSuspendedException {
|
||||||
|
attemps++;
|
||||||
|
setTimeout(Math.toIntExact(backoff));
|
||||||
|
setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
|
||||||
|
skipPersistence();
|
||||||
|
throw new ProcedureSuspendedException();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState state)
|
protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState state)
|
||||||
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
|
throws ProcedureSuspendedException {
|
||||||
switch (state) {
|
switch (state) {
|
||||||
case PRE_PEER_MODIFICATION:
|
case PRE_PEER_MODIFICATION:
|
||||||
try {
|
try {
|
||||||
|
@ -249,20 +277,24 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
|
||||||
releaseLatch();
|
releaseLatch();
|
||||||
return Flow.NO_MORE_STATE;
|
return Flow.NO_MORE_STATE;
|
||||||
} catch (ReplicationException e) {
|
} catch (ReplicationException e) {
|
||||||
LOG.warn("{} failed to call prePeerModification for peer {}, retry", getClass().getName(),
|
long backoff = ProcedureUtil.getBackoffTimeMs(attemps);
|
||||||
peerId, e);
|
LOG.warn("{} failed to call prePeerModification for peer {}, sleep {} secs",
|
||||||
throw new ProcedureYieldException();
|
getClass().getName(), peerId, backoff / 1000, e);
|
||||||
|
throw suspend(backoff);
|
||||||
}
|
}
|
||||||
|
attemps = 0;
|
||||||
setNextState(PeerModificationState.UPDATE_PEER_STORAGE);
|
setNextState(PeerModificationState.UPDATE_PEER_STORAGE);
|
||||||
return Flow.HAS_MORE_STATE;
|
return Flow.HAS_MORE_STATE;
|
||||||
case UPDATE_PEER_STORAGE:
|
case UPDATE_PEER_STORAGE:
|
||||||
try {
|
try {
|
||||||
updatePeerStorage(env);
|
updatePeerStorage(env);
|
||||||
} catch (ReplicationException e) {
|
} catch (ReplicationException e) {
|
||||||
LOG.warn("{} update peer storage for peer {} failed, retry", getClass().getName(), peerId,
|
long backoff = ProcedureUtil.getBackoffTimeMs(attemps);
|
||||||
e);
|
LOG.warn("{} update peer storage for peer {} failed, sleep {} secs", getClass().getName(),
|
||||||
throw new ProcedureYieldException();
|
peerId, backoff / 1000, e);
|
||||||
|
throw suspend(backoff);
|
||||||
}
|
}
|
||||||
|
attemps = 0;
|
||||||
setNextState(PeerModificationState.REFRESH_PEER_ON_RS);
|
setNextState(PeerModificationState.REFRESH_PEER_ON_RS);
|
||||||
return Flow.HAS_MORE_STATE;
|
return Flow.HAS_MORE_STATE;
|
||||||
case REFRESH_PEER_ON_RS:
|
case REFRESH_PEER_ON_RS:
|
||||||
|
@ -273,30 +305,37 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
|
||||||
try {
|
try {
|
||||||
reopenRegions(env);
|
reopenRegions(env);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.warn("{} reopen regions for peer {} failed, retry", getClass().getName(), peerId, e);
|
long backoff = ProcedureUtil.getBackoffTimeMs(attemps);
|
||||||
throw new ProcedureYieldException();
|
LOG.warn("{} reopen regions for peer {} failed, sleep {} secs", getClass().getName(),
|
||||||
|
peerId, backoff / 1000, e);
|
||||||
|
throw suspend(backoff);
|
||||||
}
|
}
|
||||||
|
attemps = 0;
|
||||||
setNextState(PeerModificationState.SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID);
|
setNextState(PeerModificationState.SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID);
|
||||||
return Flow.HAS_MORE_STATE;
|
return Flow.HAS_MORE_STATE;
|
||||||
case SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID:
|
case SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID:
|
||||||
try {
|
try {
|
||||||
updateLastPushedSequenceIdForSerialPeer(env);
|
updateLastPushedSequenceIdForSerialPeer(env);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.warn("{} set last sequence id for peer {} failed, retry", getClass().getName(),
|
long backoff = ProcedureUtil.getBackoffTimeMs(attemps);
|
||||||
peerId, e);
|
LOG.warn("{} set last sequence id for peer {} failed, sleep {} secs",
|
||||||
throw new ProcedureYieldException();
|
getClass().getName(), peerId, backoff / 1000, e);
|
||||||
|
throw suspend(backoff);
|
||||||
}
|
}
|
||||||
|
attemps = 0;
|
||||||
setNextState(enablePeerBeforeFinish() ? PeerModificationState.SERIAL_PEER_SET_PEER_ENABLED
|
setNextState(enablePeerBeforeFinish() ? PeerModificationState.SERIAL_PEER_SET_PEER_ENABLED
|
||||||
: PeerModificationState.POST_PEER_MODIFICATION);
|
: PeerModificationState.POST_PEER_MODIFICATION);
|
||||||
return Flow.HAS_MORE_STATE;
|
return Flow.HAS_MORE_STATE;
|
||||||
case SERIAL_PEER_SET_PEER_ENABLED:
|
case SERIAL_PEER_SET_PEER_ENABLED:
|
||||||
try {
|
try {
|
||||||
env.getReplicationPeerManager().enablePeer(peerId);
|
enablePeer(env);
|
||||||
} catch (ReplicationException e) {
|
} catch (ReplicationException e) {
|
||||||
LOG.warn("{} enable peer before finish for peer {} failed, retry", getClass().getName(),
|
long backoff = ProcedureUtil.getBackoffTimeMs(attemps);
|
||||||
peerId, e);
|
LOG.warn("{} enable peer before finish for peer {} failed, sleep {} secs",
|
||||||
throw new ProcedureYieldException();
|
getClass().getName(), peerId, backoff / 1000, e);
|
||||||
|
throw suspend(backoff);
|
||||||
}
|
}
|
||||||
|
attemps = 0;
|
||||||
setNextState(PeerModificationState.SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS);
|
setNextState(PeerModificationState.SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS);
|
||||||
return Flow.HAS_MORE_STATE;
|
return Flow.HAS_MORE_STATE;
|
||||||
case SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS:
|
case SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS:
|
||||||
|
@ -307,9 +346,10 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
|
||||||
try {
|
try {
|
||||||
postPeerModification(env);
|
postPeerModification(env);
|
||||||
} catch (ReplicationException e) {
|
} catch (ReplicationException e) {
|
||||||
LOG.warn("{} failed to call postPeerModification for peer {}, retry",
|
long backoff = ProcedureUtil.getBackoffTimeMs(attemps);
|
||||||
getClass().getName(), peerId, e);
|
LOG.warn("{} failed to call postPeerModification for peer {}, sleep {} secs",
|
||||||
throw new ProcedureYieldException();
|
getClass().getName(), peerId, backoff / 1000, e);
|
||||||
|
throw suspend(backoff);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn("{} failed to call post CP hook for peer {}, " +
|
LOG.warn("{} failed to call post CP hook for peer {}, " +
|
||||||
"ignore since the procedure has already done", getClass().getName(), peerId, e);
|
"ignore since the procedure has already done", getClass().getName(), peerId, e);
|
||||||
|
|
|
@ -0,0 +1,85 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.Optional;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.gson.JsonArray;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.gson.JsonElement;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.gson.JsonObject;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.gson.JsonParser;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
|
||||||
|
|
||||||
|
public final class ProcedureTestUtil {
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(ProcedureTestUtil.class);
|
||||||
|
|
||||||
|
private ProcedureTestUtil() {
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Optional<JsonObject> getProcedure(HBaseTestingUtility util,
|
||||||
|
Class<? extends Procedure<?>> clazz, JsonParser parser) throws IOException {
|
||||||
|
JsonArray array = parser.parse(util.getAdmin().getProcedures()).getAsJsonArray();
|
||||||
|
Iterator<JsonElement> iterator = array.iterator();
|
||||||
|
while (iterator.hasNext()) {
|
||||||
|
JsonElement element = iterator.next();
|
||||||
|
JsonObject obj = element.getAsJsonObject();
|
||||||
|
String className = obj.get("className").getAsString();
|
||||||
|
if (className.equals(clazz.getName())) {
|
||||||
|
return Optional.of(obj);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return Optional.empty();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void waitUntilProcedureWaitingTimeout(HBaseTestingUtility util,
|
||||||
|
Class<? extends Procedure<?>> clazz, long timeout) throws IOException {
|
||||||
|
JsonParser parser = new JsonParser();
|
||||||
|
util.waitFor(timeout,
|
||||||
|
() -> getProcedure(util, clazz, parser)
|
||||||
|
.filter(o -> ProcedureState.WAITING_TIMEOUT.name().equals(o.get("state").getAsString()))
|
||||||
|
.isPresent());
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void waitUntilProcedureTimeoutIncrease(HBaseTestingUtility util,
|
||||||
|
Class<? extends Procedure<?>> clazz, int times) throws IOException, InterruptedException {
|
||||||
|
JsonParser parser = new JsonParser();
|
||||||
|
long oldTimeout = 0;
|
||||||
|
int timeoutIncrements = 0;
|
||||||
|
for (;;) {
|
||||||
|
long timeout = getProcedure(util, clazz, parser).filter(o -> o.has("timeout"))
|
||||||
|
.map(o -> o.get("timeout").getAsLong()).orElse(-1L);
|
||||||
|
if (timeout > oldTimeout) {
|
||||||
|
LOG.info("Timeout incremented, was {}, now is {}, increments={}", timeout, oldTimeout,
|
||||||
|
timeoutIncrements);
|
||||||
|
oldTimeout = timeout;
|
||||||
|
timeoutIncrements++;
|
||||||
|
if (timeoutIncrements > times) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Thread.sleep(1000);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -18,10 +18,10 @@
|
||||||
package org.apache.hadoop.hbase.master.assignment;
|
package org.apache.hadoop.hbase.master.assignment;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.ProcedureTestUtil;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
@ -45,13 +45,6 @@ import org.junit.BeforeClass;
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.gson.JsonArray;
|
|
||||||
import org.apache.hbase.thirdparty.com.google.gson.JsonElement;
|
|
||||||
import org.apache.hbase.thirdparty.com.google.gson.JsonObject;
|
|
||||||
import org.apache.hbase.thirdparty.com.google.gson.JsonParser;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Confirm that we will do backoff when retrying on closing a region, to avoid consuming all the
|
* Confirm that we will do backoff when retrying on closing a region, to avoid consuming all the
|
||||||
|
@ -64,8 +57,6 @@ public class TestCloseRegionWhileRSCrash {
|
||||||
public static final HBaseClassTestRule CLASS_RULE =
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
HBaseClassTestRule.forClass(TestCloseRegionWhileRSCrash.class);
|
HBaseClassTestRule.forClass(TestCloseRegionWhileRSCrash.class);
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(TestCloseRegionWhileRSCrash.class);
|
|
||||||
|
|
||||||
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||||
|
|
||||||
private static TableName TABLE_NAME = TableName.valueOf("Backoff");
|
private static TableName TABLE_NAME = TableName.valueOf("Backoff");
|
||||||
|
@ -189,25 +180,11 @@ public class TestCloseRegionWhileRSCrash {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
t.start();
|
t.start();
|
||||||
JsonParser parser = new JsonParser();
|
|
||||||
long oldTimeout = 0;
|
|
||||||
int timeoutIncrements = 0;
|
|
||||||
// wait until we enter the WAITING_TIMEOUT state
|
// wait until we enter the WAITING_TIMEOUT state
|
||||||
UTIL.waitFor(30000, () -> getTimeout(parser, UTIL.getAdmin().getProcedures()) > 0);
|
ProcedureTestUtil.waitUntilProcedureWaitingTimeout(UTIL, TransitRegionStateProcedure.class,
|
||||||
while (true) {
|
30000);
|
||||||
long timeout = getTimeout(parser, UTIL.getAdmin().getProcedures());
|
// wait until the timeout value increase three times
|
||||||
if (timeout > oldTimeout) {
|
ProcedureTestUtil.waitUntilProcedureTimeoutIncrease(UTIL, TransitRegionStateProcedure.class, 3);
|
||||||
LOG.info("Timeout incremented, was {}, now is {}, increments={}", timeout, oldTimeout,
|
|
||||||
timeoutIncrements);
|
|
||||||
oldTimeout = timeout;
|
|
||||||
timeoutIncrements++;
|
|
||||||
if (timeoutIncrements > 3) {
|
|
||||||
// If we incremented at least twice, break; the backoff is working.
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Thread.sleep(1000);
|
|
||||||
}
|
|
||||||
// let's close the connection to make sure that the SCP can not update meta successfully
|
// let's close the connection to make sure that the SCP can not update meta successfully
|
||||||
UTIL.getMiniHBaseCluster().getMaster().getConnection().close();
|
UTIL.getMiniHBaseCluster().getMaster().getConnection().close();
|
||||||
RESUME.countDown();
|
RESUME.countDown();
|
||||||
|
@ -223,24 +200,4 @@ public class TestCloseRegionWhileRSCrash {
|
||||||
table.put(new Put(Bytes.toBytes(1)).addColumn(CF, Bytes.toBytes("cq"), Bytes.toBytes(1)));
|
table.put(new Put(Bytes.toBytes(1)).addColumn(CF, Bytes.toBytes("cq"), Bytes.toBytes(1)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* @param proceduresAsJSON This is String returned by admin.getProcedures call... an array of
|
|
||||||
* Procedures as JSON.
|
|
||||||
* @return The Procedure timeout value parsed from the TRSP.
|
|
||||||
*/
|
|
||||||
private long getTimeout(JsonParser parser, String proceduresAsJSON) {
|
|
||||||
JsonArray array = parser.parse(proceduresAsJSON).getAsJsonArray();
|
|
||||||
Iterator<JsonElement> iterator = array.iterator();
|
|
||||||
while (iterator.hasNext()) {
|
|
||||||
JsonElement element = iterator.next();
|
|
||||||
JsonObject obj = element.getAsJsonObject();
|
|
||||||
String className = obj.get("className").getAsString();
|
|
||||||
String actualClassName = TransitRegionStateProcedure.class.getName();
|
|
||||||
if (className.equals(actualClassName) && obj.has("timeout")) {
|
|
||||||
return obj.get("timeout").getAsLong();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return -1L;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,166 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.master.replication;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.ProcedureTestUtil;
|
||||||
|
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
|
||||||
|
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState;
|
||||||
|
|
||||||
|
@Category({ MasterTests.class, LargeTests.class })
|
||||||
|
public class TestModifyPeerProcedureRetryBackoff {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestModifyPeerProcedureRetryBackoff.class);
|
||||||
|
|
||||||
|
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||||
|
|
||||||
|
private static boolean FAIL = true;
|
||||||
|
|
||||||
|
public static class TestModifyPeerProcedure extends ModifyPeerProcedure {
|
||||||
|
|
||||||
|
public TestModifyPeerProcedure() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public TestModifyPeerProcedure(String peerId) {
|
||||||
|
super(peerId);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public PeerOperationType getPeerOperationType() {
|
||||||
|
return PeerOperationType.ADD;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void tryFail() throws ReplicationException {
|
||||||
|
synchronized (TestModifyPeerProcedureRetryBackoff.class) {
|
||||||
|
if (FAIL) {
|
||||||
|
throw new ReplicationException("Inject error");
|
||||||
|
}
|
||||||
|
FAIL = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected <T extends Procedure<MasterProcedureEnv>> void addChildProcedure(
|
||||||
|
@SuppressWarnings("unchecked") T... subProcedure) {
|
||||||
|
// Make it a no-op
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected PeerModificationState nextStateAfterRefresh() {
|
||||||
|
return PeerModificationState.SERIAL_PEER_REOPEN_REGIONS;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean enablePeerBeforeFinish() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void updateLastPushedSequenceIdForSerialPeer(MasterProcedureEnv env)
|
||||||
|
throws IOException, ReplicationException {
|
||||||
|
tryFail();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void reopenRegions(MasterProcedureEnv env) throws IOException {
|
||||||
|
try {
|
||||||
|
tryFail();
|
||||||
|
} catch (ReplicationException e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void enablePeer(MasterProcedureEnv env) throws ReplicationException {
|
||||||
|
tryFail();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void prePeerModification(MasterProcedureEnv env)
|
||||||
|
throws IOException, ReplicationException {
|
||||||
|
tryFail();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException {
|
||||||
|
tryFail();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void postPeerModification(MasterProcedureEnv env)
|
||||||
|
throws IOException, ReplicationException {
|
||||||
|
tryFail();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUp() throws Exception {
|
||||||
|
UTIL.startMiniCluster(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDown() throws Exception {
|
||||||
|
UTIL.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertBackoffIncrease() throws IOException, InterruptedException {
|
||||||
|
ProcedureTestUtil.waitUntilProcedureWaitingTimeout(UTIL, TestModifyPeerProcedure.class, 30000);
|
||||||
|
ProcedureTestUtil.waitUntilProcedureTimeoutIncrease(UTIL, TestModifyPeerProcedure.class, 2);
|
||||||
|
synchronized (TestModifyPeerProcedureRetryBackoff.class) {
|
||||||
|
FAIL = false;
|
||||||
|
}
|
||||||
|
UTIL.waitFor(30000, () -> FAIL);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test() throws IOException, InterruptedException {
|
||||||
|
ProcedureExecutor<MasterProcedureEnv> procExec =
|
||||||
|
UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
|
||||||
|
long procId = procExec.submitProcedure(new TestModifyPeerProcedure("1"));
|
||||||
|
// PRE_PEER_MODIFICATION
|
||||||
|
assertBackoffIncrease();
|
||||||
|
// UPDATE_PEER_STORAGE
|
||||||
|
assertBackoffIncrease();
|
||||||
|
// No retry for REFRESH_PEER_ON_RS
|
||||||
|
// SERIAL_PEER_REOPEN_REGIONS
|
||||||
|
assertBackoffIncrease();
|
||||||
|
// SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID
|
||||||
|
assertBackoffIncrease();
|
||||||
|
// SERIAL_PEER_SET_PEER_ENABLED
|
||||||
|
assertBackoffIncrease();
|
||||||
|
// No retry for SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS
|
||||||
|
// POST_PEER_MODIFICATION
|
||||||
|
assertBackoffIncrease();
|
||||||
|
UTIL.waitFor(30000, () -> procExec.isFinished(procId));
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue