HBASE-21248 Implement exponential backoff when retrying for ModifyPeerProcedure

This commit is contained in:
zhangduo 2018-09-29 09:55:24 +08:00
parent 3d1a2dbe68
commit 1728a20be2
4 changed files with 317 additions and 69 deletions

View File

@ -32,7 +32,7 @@ import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
@ -42,7 +42,10 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
/**
* The base class for all replication peer related procedure except sync replication state
@ -58,6 +61,8 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
// The sleep interval when waiting table to be enabled or disabled.
protected static final int SLEEP_INTERVAL_MS = 1000;
private int attemps;
protected ModifyPeerProcedure() {
}
@ -149,7 +154,9 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
}
}
private void reopenRegions(MasterProcedureEnv env) throws IOException {
// will be override in test to simulate error
@VisibleForTesting
protected void reopenRegions(MasterProcedureEnv env) throws IOException {
ReplicationPeerConfig peerConfig = getNewPeerConfig();
ReplicationPeerConfig oldPeerConfig = getOldPeerConfig();
TableStateManager tsm = env.getMasterServices().getTableStateManager();
@ -171,6 +178,12 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
}
}
// will be override in test to simulate error
@VisibleForTesting
protected void enablePeer(MasterProcedureEnv env) throws ReplicationException {
env.getReplicationPeerManager().enablePeer(peerId);
}
private void addToMap(Map<String, Long> lastSeqIds, String encodedRegionName, long barrier,
ReplicationQueueStorage queueStorage) throws ReplicationException {
if (barrier >= 0) {
@ -241,9 +254,24 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
}
}
@Override
protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
setState(ProcedureProtos.ProcedureState.RUNNABLE);
env.getProcedureScheduler().addFront(this);
return false;
}
private ProcedureSuspendedException suspend(long backoff) throws ProcedureSuspendedException {
attemps++;
setTimeout(Math.toIntExact(backoff));
setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
skipPersistence();
throw new ProcedureSuspendedException();
}
@Override
protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState state)
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
throws ProcedureSuspendedException {
switch (state) {
case PRE_PEER_MODIFICATION:
try {
@ -255,20 +283,24 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
releaseLatch();
return Flow.NO_MORE_STATE;
} catch (ReplicationException e) {
LOG.warn("{} failed to call prePeerModification for peer {}, retry", getClass().getName(),
peerId, e);
throw new ProcedureYieldException();
long backoff = ProcedureUtil.getBackoffTimeMs(attemps);
LOG.warn("{} failed to call prePeerModification for peer {}, sleep {} secs",
getClass().getName(), peerId, backoff / 1000, e);
throw suspend(backoff);
}
attemps = 0;
setNextState(PeerModificationState.UPDATE_PEER_STORAGE);
return Flow.HAS_MORE_STATE;
case UPDATE_PEER_STORAGE:
try {
updatePeerStorage(env);
} catch (ReplicationException e) {
LOG.warn("{} update peer storage for peer {} failed, retry", getClass().getName(), peerId,
e);
throw new ProcedureYieldException();
long backoff = ProcedureUtil.getBackoffTimeMs(attemps);
LOG.warn("{} update peer storage for peer {} failed, sleep {} secs", getClass().getName(),
peerId, backoff / 1000, e);
throw suspend(backoff);
}
attemps = 0;
setNextState(PeerModificationState.REFRESH_PEER_ON_RS);
return Flow.HAS_MORE_STATE;
case REFRESH_PEER_ON_RS:
@ -279,30 +311,37 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
try {
reopenRegions(env);
} catch (Exception e) {
LOG.warn("{} reopen regions for peer {} failed, retry", getClass().getName(), peerId, e);
throw new ProcedureYieldException();
long backoff = ProcedureUtil.getBackoffTimeMs(attemps);
LOG.warn("{} reopen regions for peer {} failed, sleep {} secs", getClass().getName(),
peerId, backoff / 1000, e);
throw suspend(backoff);
}
attemps = 0;
setNextState(PeerModificationState.SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID);
return Flow.HAS_MORE_STATE;
case SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID:
try {
updateLastPushedSequenceIdForSerialPeer(env);
} catch (Exception e) {
LOG.warn("{} set last sequence id for peer {} failed, retry", getClass().getName(),
peerId, e);
throw new ProcedureYieldException();
long backoff = ProcedureUtil.getBackoffTimeMs(attemps);
LOG.warn("{} set last sequence id for peer {} failed, sleep {} secs",
getClass().getName(), peerId, backoff / 1000, e);
throw suspend(backoff);
}
attemps = 0;
setNextState(enablePeerBeforeFinish() ? PeerModificationState.SERIAL_PEER_SET_PEER_ENABLED
: PeerModificationState.POST_PEER_MODIFICATION);
return Flow.HAS_MORE_STATE;
case SERIAL_PEER_SET_PEER_ENABLED:
try {
env.getReplicationPeerManager().enablePeer(peerId);
enablePeer(env);
} catch (ReplicationException e) {
LOG.warn("{} enable peer before finish for peer {} failed, retry", getClass().getName(),
peerId, e);
throw new ProcedureYieldException();
long backoff = ProcedureUtil.getBackoffTimeMs(attemps);
LOG.warn("{} enable peer before finish for peer {} failed, sleep {} secs",
getClass().getName(), peerId, backoff / 1000, e);
throw suspend(backoff);
}
attemps = 0;
setNextState(PeerModificationState.SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS);
return Flow.HAS_MORE_STATE;
case SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS:
@ -313,9 +352,10 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
try {
postPeerModification(env);
} catch (ReplicationException e) {
LOG.warn("{} failed to call postPeerModification for peer {}, retry",
getClass().getName(), peerId, e);
throw new ProcedureYieldException();
long backoff = ProcedureUtil.getBackoffTimeMs(attemps);
LOG.warn("{} failed to call postPeerModification for peer {}, sleep {} secs",
getClass().getName(), peerId, backoff / 1000, e);
throw suspend(backoff);
} catch (IOException e) {
LOG.warn("{} failed to call post CP hook for peer {}, " +
"ignore since the procedure has already done", getClass().getName(), peerId, e);

View File

@ -0,0 +1,85 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import java.io.IOException;
import java.util.Iterator;
import java.util.Optional;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.gson.JsonArray;
import org.apache.hbase.thirdparty.com.google.gson.JsonElement;
import org.apache.hbase.thirdparty.com.google.gson.JsonObject;
import org.apache.hbase.thirdparty.com.google.gson.JsonParser;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
public final class ProcedureTestUtil {
private static final Logger LOG = LoggerFactory.getLogger(ProcedureTestUtil.class);
private ProcedureTestUtil() {
}
private static Optional<JsonObject> getProcedure(HBaseTestingUtility util,
Class<? extends Procedure<?>> clazz, JsonParser parser) throws IOException {
JsonArray array = parser.parse(util.getAdmin().getProcedures()).getAsJsonArray();
Iterator<JsonElement> iterator = array.iterator();
while (iterator.hasNext()) {
JsonElement element = iterator.next();
JsonObject obj = element.getAsJsonObject();
String className = obj.get("className").getAsString();
if (className.equals(clazz.getName())) {
return Optional.of(obj);
}
}
return Optional.empty();
}
public static void waitUntilProcedureWaitingTimeout(HBaseTestingUtility util,
Class<? extends Procedure<?>> clazz, long timeout) throws IOException {
JsonParser parser = new JsonParser();
util.waitFor(timeout,
() -> getProcedure(util, clazz, parser)
.filter(o -> ProcedureState.WAITING_TIMEOUT.name().equals(o.get("state").getAsString()))
.isPresent());
}
public static void waitUntilProcedureTimeoutIncrease(HBaseTestingUtility util,
Class<? extends Procedure<?>> clazz, int times) throws IOException, InterruptedException {
JsonParser parser = new JsonParser();
long oldTimeout = 0;
int timeoutIncrements = 0;
for (;;) {
long timeout = getProcedure(util, clazz, parser).filter(o -> o.has("timeout"))
.map(o -> o.get("timeout").getAsLong()).orElse(-1L);
if (timeout > oldTimeout) {
LOG.info("Timeout incremented, was {}, now is {}, increments={}", timeout, oldTimeout,
timeoutIncrements);
oldTimeout = timeout;
timeoutIncrements++;
if (timeoutIncrements > times) {
break;
}
}
Thread.sleep(1000);
}
}
}

View File

@ -18,10 +18,10 @@
package org.apache.hadoop.hbase.master.assignment;
import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.ProcedureTestUtil;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
@ -45,13 +45,6 @@ import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.gson.JsonArray;
import org.apache.hbase.thirdparty.com.google.gson.JsonElement;
import org.apache.hbase.thirdparty.com.google.gson.JsonObject;
import org.apache.hbase.thirdparty.com.google.gson.JsonParser;
/**
* Confirm that we will do backoff when retrying on closing a region, to avoid consuming all the
@ -64,8 +57,6 @@ public class TestCloseRegionWhileRSCrash {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestCloseRegionWhileRSCrash.class);
private static final Logger LOG = LoggerFactory.getLogger(TestCloseRegionWhileRSCrash.class);
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static TableName TABLE_NAME = TableName.valueOf("Backoff");
@ -189,25 +180,11 @@ public class TestCloseRegionWhileRSCrash {
}
});
t.start();
JsonParser parser = new JsonParser();
long oldTimeout = 0;
int timeoutIncrements = 0;
// wait until we enter the WAITING_TIMEOUT state
UTIL.waitFor(30000, () -> getTimeout(parser, UTIL.getAdmin().getProcedures()) > 0);
while (true) {
long timeout = getTimeout(parser, UTIL.getAdmin().getProcedures());
if (timeout > oldTimeout) {
LOG.info("Timeout incremented, was {}, now is {}, increments={}", timeout, oldTimeout,
timeoutIncrements);
oldTimeout = timeout;
timeoutIncrements++;
if (timeoutIncrements > 3) {
// If we incremented at least twice, break; the backoff is working.
break;
}
}
Thread.sleep(1000);
}
ProcedureTestUtil.waitUntilProcedureWaitingTimeout(UTIL, TransitRegionStateProcedure.class,
30000);
// wait until the timeout value increase three times
ProcedureTestUtil.waitUntilProcedureTimeoutIncrease(UTIL, TransitRegionStateProcedure.class, 3);
// let's close the connection to make sure that the SCP can not update meta successfully
UTIL.getMiniHBaseCluster().getMaster().getConnection().close();
RESUME.countDown();
@ -223,24 +200,4 @@ public class TestCloseRegionWhileRSCrash {
table.put(new Put(Bytes.toBytes(1)).addColumn(CF, Bytes.toBytes("cq"), Bytes.toBytes(1)));
}
}
/**
* @param proceduresAsJSON This is String returned by admin.getProcedures call... an array of
* Procedures as JSON.
* @return The Procedure timeout value parsed from the TRSP.
*/
private long getTimeout(JsonParser parser, String proceduresAsJSON) {
JsonArray array = parser.parse(proceduresAsJSON).getAsJsonArray();
Iterator<JsonElement> iterator = array.iterator();
while (iterator.hasNext()) {
JsonElement element = iterator.next();
JsonObject obj = element.getAsJsonObject();
String className = obj.get("className").getAsString();
String actualClassName = TransitRegionStateProcedure.class.getName();
if (className.equals(actualClassName) && obj.has("timeout")) {
return obj.get("timeout").getAsLong();
}
}
return -1L;
}
}

View File

@ -0,0 +1,166 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.ProcedureTestUtil;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState;
@Category({ MasterTests.class, LargeTests.class })
public class TestModifyPeerProcedureRetryBackoff {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestModifyPeerProcedureRetryBackoff.class);
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static boolean FAIL = true;
public static class TestModifyPeerProcedure extends ModifyPeerProcedure {
public TestModifyPeerProcedure() {
}
public TestModifyPeerProcedure(String peerId) {
super(peerId);
}
@Override
public PeerOperationType getPeerOperationType() {
return PeerOperationType.ADD;
}
private void tryFail() throws ReplicationException {
synchronized (TestModifyPeerProcedureRetryBackoff.class) {
if (FAIL) {
throw new ReplicationException("Inject error");
}
FAIL = true;
}
}
@Override
protected <T extends Procedure<MasterProcedureEnv>> void addChildProcedure(
@SuppressWarnings("unchecked") T... subProcedure) {
// Make it a no-op
}
@Override
protected PeerModificationState nextStateAfterRefresh() {
return PeerModificationState.SERIAL_PEER_REOPEN_REGIONS;
}
@Override
protected boolean enablePeerBeforeFinish() {
return true;
}
@Override
protected void updateLastPushedSequenceIdForSerialPeer(MasterProcedureEnv env)
throws IOException, ReplicationException {
tryFail();
}
@Override
protected void reopenRegions(MasterProcedureEnv env) throws IOException {
try {
tryFail();
} catch (ReplicationException e) {
throw new IOException(e);
}
}
@Override
protected void enablePeer(MasterProcedureEnv env) throws ReplicationException {
tryFail();
}
@Override
protected void prePeerModification(MasterProcedureEnv env)
throws IOException, ReplicationException {
tryFail();
}
@Override
protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException {
tryFail();
}
@Override
protected void postPeerModification(MasterProcedureEnv env)
throws IOException, ReplicationException {
tryFail();
}
}
@BeforeClass
public static void setUp() throws Exception {
UTIL.startMiniCluster(1);
}
@AfterClass
public static void tearDown() throws Exception {
UTIL.shutdownMiniCluster();
}
private void assertBackoffIncrease() throws IOException, InterruptedException {
ProcedureTestUtil.waitUntilProcedureWaitingTimeout(UTIL, TestModifyPeerProcedure.class, 30000);
ProcedureTestUtil.waitUntilProcedureTimeoutIncrease(UTIL, TestModifyPeerProcedure.class, 2);
synchronized (TestModifyPeerProcedureRetryBackoff.class) {
FAIL = false;
}
UTIL.waitFor(30000, () -> FAIL);
}
@Test
public void test() throws IOException, InterruptedException {
ProcedureExecutor<MasterProcedureEnv> procExec =
UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
long procId = procExec.submitProcedure(new TestModifyPeerProcedure("1"));
// PRE_PEER_MODIFICATION
assertBackoffIncrease();
// UPDATE_PEER_STORAGE
assertBackoffIncrease();
// No retry for REFRESH_PEER_ON_RS
// SERIAL_PEER_REOPEN_REGIONS
assertBackoffIncrease();
// SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID
assertBackoffIncrease();
// SERIAL_PEER_SET_PEER_ENABLED
assertBackoffIncrease();
// No retry for SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS
// POST_PEER_MODIFICATION
assertBackoffIncrease();
UTIL.waitFor(30000, () -> procExec.isFinished(procId));
}
}