HBASE-27429 Add exponential retry backoff support for MigrateReplicationQueueFromZkToTableProcedure

Signed-off-by: Liangjun He <heliangjun@apache.org>
This commit is contained in:
Duo Zhang 2022-10-18 16:46:03 +08:00 committed by Duo Zhang
parent ffad1ff727
commit dfb125f3b2
4 changed files with 125 additions and 70 deletions

View File

@ -78,9 +78,13 @@ class TimeoutExecutorThread<TEnvironment> extends StoppableThread {
} }
public void add(Procedure<TEnvironment> procedure) { public void add(Procedure<TEnvironment> procedure) {
LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, procedure.getTimeout(), if (procedure.getTimeout() > 0) {
procedure.getTimeoutTimestamp()); LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, procedure.getTimeout(),
queue.add(new DelayedProcedure<>(procedure)); procedure.getTimeoutTimestamp());
queue.add(new DelayedProcedure<>(procedure));
} else {
LOG.info("Got negative timeout {} for {}, skip adding", procedure.getTimeout(), procedure);
}
} }
public boolean remove(Procedure<TEnvironment> procedure) { public boolean remove(Procedure<TEnvironment> procedure) {

View File

@ -25,19 +25,25 @@ import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureP
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.function.LongConsumer;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.master.procedure.GlobalProcedureInterface; import org.apache.hadoop.hbase.master.procedure.GlobalProcedureInterface;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface; import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration; import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.IdLock;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
@ -65,18 +71,34 @@ public class MigrateReplicationQueueFromZkToTableProcedure
private List<String> disabledPeerIds; private List<String> disabledPeerIds;
private List<Future<?>> futures; private CompletableFuture<?> future;
private ExecutorService executor; private ExecutorService executor;
private RetryCounter retryCounter;
@Override @Override
public String getGlobalId() { public String getGlobalId() {
return getClass().getSimpleName(); return getClass().getSimpleName();
} }
private ProcedureSuspendedException suspend(Configuration conf, LongConsumer backoffConsumer)
throws ProcedureSuspendedException {
if (retryCounter == null) {
retryCounter = ProcedureUtil.createRetryCounter(conf);
}
long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
backoffConsumer.accept(backoff);
throw suspend(Math.toIntExact(backoff), true);
}
private void resetRetry() {
retryCounter = null;
}
private ExecutorService getExecutorService() { private ExecutorService getExecutorService() {
if (executor == null) { if (executor == null) {
executor = Executors.newFixedThreadPool(3, new ThreadFactoryBuilder() executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
.setNameFormat(getClass().getSimpleName() + "-%d").setDaemon(true).build()); .setNameFormat(getClass().getSimpleName() + "-%d").setDaemon(true).build());
} }
return executor; return executor;
@ -95,14 +117,17 @@ public class MigrateReplicationQueueFromZkToTableProcedure
peerProcCount = env.getMasterServices().getProcedures().stream() peerProcCount = env.getMasterServices().getProcedures().stream()
.filter(p -> p instanceof PeerProcedureInterface).filter(p -> !p.isFinished()).count(); .filter(p -> p instanceof PeerProcedureInterface).filter(p -> !p.isFinished()).count();
} catch (IOException e) { } catch (IOException e) {
LOG.warn("failed to check peer procedure status", e); throw suspend(env.getMasterConfiguration(),
throw suspend(5000, true); backoff -> LOG.warn("failed to check peer procedure status, sleep {} secs and retry later",
backoff / 1000, e));
} }
if (peerProcCount > 0) { if (peerProcCount > 0) {
LOG.info("There are still {} pending peer procedures, will sleep and check later", throw suspend(env.getMasterConfiguration(),
peerProcCount); backoff -> LOG.info(
throw suspend(10_000, true); "There are still {} pending peer procedures, sleep {} secs and retry later",
peerProcCount, backoff / 1000));
} }
resetRetry();
LOG.info("No pending peer procedures found, continue..."); LOG.info("No pending peer procedures found, continue...");
} }
@ -122,8 +147,10 @@ public class MigrateReplicationQueueFromZkToTableProcedure
try { try {
oldStorage.deleteAllData(); oldStorage.deleteAllData();
} catch (KeeperException e) { } catch (KeeperException e) {
LOG.warn("failed to delete old replication queue data, sleep and retry later", e); throw suspend(env.getMasterConfiguration(),
suspend(10_000, true); backoff -> LOG.warn(
"failed to delete old replication queue data, sleep {} secs and retry later",
backoff / 1000, e));
} }
return Flow.NO_MORE_STATE; return Flow.NO_MORE_STATE;
} }
@ -132,6 +159,7 @@ public class MigrateReplicationQueueFromZkToTableProcedure
disabledPeerIds = peers.stream().filter(ReplicationPeerDescription::isEnabled) disabledPeerIds = peers.stream().filter(ReplicationPeerDescription::isEnabled)
.map(ReplicationPeerDescription::getPeerId).collect(Collectors.toList()); .map(ReplicationPeerDescription::getPeerId).collect(Collectors.toList());
setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER); setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER);
resetRetry();
return Flow.HAS_MORE_STATE; return Flow.HAS_MORE_STATE;
case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER: case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER:
for (String peerId : disabledPeerIds) { for (String peerId : disabledPeerIds) {
@ -140,39 +168,52 @@ public class MigrateReplicationQueueFromZkToTableProcedure
setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE); setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE);
return Flow.HAS_MORE_STATE; return Flow.HAS_MORE_STATE;
case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE: case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE:
if (futures != null) { if (future != null) {
// wait until all futures done // should have finished when we arrive here
long notDone = futures.stream().filter(f -> !f.isDone()).count(); assert future.isDone();
if (notDone == 0) { try {
boolean succ = true; future.get();
for (Future<?> future : futures) { } catch (Exception e) {
try { future = null;
future.get(); throw suspend(env.getMasterConfiguration(),
} catch (Exception e) { backoff -> LOG.warn("failed to migrate queue data, sleep {} secs and retry later",
succ = false; backoff / 1000, e));
LOG.warn("Failed to migrate", e);
}
}
if (succ) {
shutdownExecutorService();
setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING);
return Flow.HAS_MORE_STATE;
}
// reschedule to retry migration again
futures = null;
} else {
LOG.info("There still {} pending migration tasks, will sleep and check later", notDone);
throw suspend(10_000, true);
} }
shutdownExecutorService();
setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING);
resetRetry();
return Flow.HAS_MORE_STATE;
} }
try { future = env.getReplicationPeerManager()
futures = env.getReplicationPeerManager() .migrateQueuesFromZk(env.getMasterServices().getZooKeeper(), getExecutorService());
.migrateQueuesFromZk(env.getMasterServices().getZooKeeper(), getExecutorService()); FutureUtils.addListener(future, (r, e) -> {
} catch (IOException e) { // should acquire procedure execution lock to make sure that the procedure executor has
LOG.warn("failed to submit migration tasks", e); // finished putting this procedure to the WAITING_TIMEOUT state, otherwise there could be
throw suspend(10_000, true); // race and cause unexpected result
} IdLock procLock =
throw suspend(10_000, true); env.getMasterServices().getMasterProcedureExecutor().getProcExecutionLock();
IdLock.Entry lockEntry;
try {
lockEntry = procLock.getLockEntry(getProcId());
} catch (IOException ioe) {
LOG.error("Error while acquiring execution lock for procedure {}"
+ " when trying to wake it up, aborting...", ioe);
env.getMasterServices().abort("Can not acquire procedure execution lock", e);
return;
}
try {
setTimeoutFailure(env);
} finally {
procLock.releaseLockEntry(lockEntry);
}
});
// here we set timeout to -1 so the ProcedureExecutor will not schedule a Timer for us
setTimeout(-1);
setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
// skip persistence is a must now since when restarting, if the procedure is in
// WAITING_TIMEOUT state and has -1 as timeout, it will block there forever...
skipPersistence();
throw new ProcedureSuspendedException();
case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING: case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING:
long rsWithLowerVersion = long rsWithLowerVersion =
env.getMasterServices().getServerManager().getOnlineServers().values().stream() env.getMasterServices().getServerManager().getOnlineServers().values().stream()
@ -181,9 +222,11 @@ public class MigrateReplicationQueueFromZkToTableProcedure
setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER); setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER);
return Flow.HAS_MORE_STATE; return Flow.HAS_MORE_STATE;
} else { } else {
LOG.info("There are still {} region servers which have a major version less than {}, " throw suspend(env.getMasterConfiguration(),
+ "will sleep and check later", rsWithLowerVersion, MIN_MAJOR_VERSION); backoff -> LOG.warn(
throw suspend(10_000, true); "There are still {} region servers which have a major version"
+ " less than {}, sleep {} secs and check later",
rsWithLowerVersion, MIN_MAJOR_VERSION, backoff / 1000));
} }
case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER: case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER:
for (String peerId : disabledPeerIds) { for (String peerId : disabledPeerIds) {

View File

@ -21,7 +21,6 @@ import com.google.errorprone.annotations.RestrictedApi;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
@ -29,10 +28,10 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@ -72,6 +71,7 @@ import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration
import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId; import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId;
import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkReplicationQueueData; import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkReplicationQueueData;
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleanerBarrier; import org.apache.hadoop.hbase.replication.master.ReplicationLogCleanerBarrier;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
@ -797,25 +797,38 @@ public class ReplicationPeerManager implements ConfigurationObserver {
} }
} }
private interface ExceptionalRunnable {
void run() throws Exception;
}
private CompletableFuture<?> runAsync(ExceptionalRunnable task, ExecutorService executor) {
CompletableFuture<?> future = new CompletableFuture<>();
executor.execute(() -> {
try {
task.run();
future.complete(null);
} catch (Exception e) {
future.completeExceptionally(e);
}
});
return future;
}
/** /**
* Submit the migration tasks to the given {@code executor} and return the futures. * Submit the migration tasks to the given {@code executor}.
*/ */
List<Future<?>> migrateQueuesFromZk(ZKWatcher zookeeper, ExecutorService executor) CompletableFuture<?> migrateQueuesFromZk(ZKWatcher zookeeper, ExecutorService executor) {
throws IOException {
// the replication queue table creation is asynchronous and will be triggered by addPeer, so // the replication queue table creation is asynchronous and will be triggered by addPeer, so
// here we need to manually initialize it since we will not call addPeer. // here we need to manually initialize it since we will not call addPeer.
initializeQueueStorage(); try {
initializeQueueStorage();
} catch (IOException e) {
return FutureUtils.failedFuture(e);
}
ZKReplicationQueueStorageForMigration oldStorage = ZKReplicationQueueStorageForMigration oldStorage =
new ZKReplicationQueueStorageForMigration(zookeeper, conf); new ZKReplicationQueueStorageForMigration(zookeeper, conf);
return Arrays.asList(executor.submit(() -> { return CompletableFuture.allOf(runAsync(() -> migrateQueues(oldStorage), executor),
migrateQueues(oldStorage); runAsync(() -> migrateLastPushedSeqIds(oldStorage), executor),
return null; runAsync(() -> migrateHFileRefs(oldStorage), executor));
}), executor.submit(() -> {
migrateLastPushedSeqIds(oldStorage);
return null;
}), executor.submit(() -> {
migrateHFileRefs(oldStorage);
return null;
}));
} }
} }

View File

@ -34,7 +34,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
@ -146,9 +145,7 @@ public class TestReplicationPeerManagerMigrateQueuesFromZk {
@Test @Test
public void testNoPeers() throws Exception { public void testNoPeers() throws Exception {
prepareData(); prepareData();
for (Future<?> future : manager.migrateQueuesFromZk(UTIL.getZooKeeperWatcher(), EXECUTOR)) { manager.migrateQueuesFromZk(UTIL.getZooKeeperWatcher(), EXECUTOR).get(1, TimeUnit.MINUTES);
future.get(1, TimeUnit.MINUTES);
}
// should have called initializer // should have called initializer
verify(queueStorageInitializer).initialize(); verify(queueStorageInitializer).initialize();
// should have not migrated any data since there is no peer // should have not migrated any data since there is no peer
@ -165,9 +162,7 @@ public class TestReplicationPeerManagerMigrateQueuesFromZk {
// value is not used in this test, so just add a mock // value is not used in this test, so just add a mock
peers.put("peer_" + i, mock(ReplicationPeerDescription.class)); peers.put("peer_" + i, mock(ReplicationPeerDescription.class));
} }
for (Future<?> future : manager.migrateQueuesFromZk(UTIL.getZooKeeperWatcher(), EXECUTOR)) { manager.migrateQueuesFromZk(UTIL.getZooKeeperWatcher(), EXECUTOR).get(1, TimeUnit.MINUTES);
future.get(1, TimeUnit.MINUTES);
}
// should have called initializer // should have called initializer
verify(queueStorageInitializer).initialize(); verify(queueStorageInitializer).initialize();
List<ReplicationQueueData> queueDatas = queueStorage.listAllQueues(); List<ReplicationQueueData> queueDatas = queueStorage.listAllQueues();