HBASE-21420 Use procedure event to wake up the SyncReplicationReplayWALProcedures which wait for worker

This commit is contained in:
Duo Zhang 2018-11-05 17:18:57 +08:00 committed by zhangduo
parent ea0f1391f1
commit c8574ba3c5
11 changed files with 492 additions and 244 deletions

View File

@ -27,6 +27,8 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
@InterfaceAudience.Private
public abstract class AbstractProcedureScheduler implements ProcedureScheduler {
private static final Logger LOG = LoggerFactory.getLogger(AbstractProcedureScheduler.class);
@ -245,7 +247,8 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler {
* Access should remain package-private. Use ProcedureEvent class to wake/suspend events.
* @param events the list of events to wake
*/
void wakeEvents(ProcedureEvent[] events) {
@VisibleForTesting
public void wakeEvents(ProcedureEvent[] events) {
schedLock();
try {
for (ProcedureEvent event : events) {

View File

@ -91,7 +91,8 @@ public class ProcedureEvent<T> {
* when waking up multiple events.
* Access should remain package-private.
*/
synchronized void wakeInternal(AbstractProcedureScheduler procedureScheduler) {
@VisibleForTesting
public synchronized void wakeInternal(AbstractProcedureScheduler procedureScheduler) {
if (ready && !suspendedProcedures.isEmpty()) {
LOG.warn("Found procedures suspended in a ready event! Size=" + suspendedProcedures.size());
}

View File

@ -507,6 +507,7 @@ message RecoverStandbyStateData {
message SyncReplicationReplayWALStateData {
required string peer_id = 1;
repeated string wal = 2;
optional ServerName worker = 3;
}
message SyncReplicationReplayWALRemoteStateData {

View File

@ -35,7 +35,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
* zookeeper.
*/
@InterfaceAudience.Private
public class ZKReplicationStorageBase {
public abstract class ZKReplicationStorageBase {
public static final String REPLICATION_ZNODE = "zookeeper.znode.replication";
public static final String REPLICATION_ZNODE_DEFAULT = "replication";

View File

@ -20,13 +20,11 @@ package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -34,6 +32,10 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RecoverStandbyState;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RecoverStandbyStateData;
/**
* The procedure for replaying all the remote wals for transitting a sync replication peer from
* STANDBY to DOWNGRADE_ACTIVE.
*/
@InterfaceAudience.Private
public class RecoverStandbyProcedure extends AbstractPeerNoLockProcedure<RecoverStandbyState> {
@ -53,7 +55,7 @@ public class RecoverStandbyProcedure extends AbstractPeerNoLockProcedure<Recover
protected Flow executeFromState(MasterProcedureEnv env, RecoverStandbyState state)
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
SyncReplicationReplayWALManager syncReplicationReplayWALManager =
env.getMasterServices().getSyncReplicationReplayWALManager();
env.getMasterServices().getSyncReplicationReplayWALManager();
switch (state) {
case RENAME_SYNC_REPLICATION_WALS_DIR:
try {
@ -66,12 +68,7 @@ public class RecoverStandbyProcedure extends AbstractPeerNoLockProcedure<Recover
setNextState(RecoverStandbyState.REGISTER_PEER_TO_WORKER_STORAGE);
return Flow.HAS_MORE_STATE;
case REGISTER_PEER_TO_WORKER_STORAGE:
try {
syncReplicationReplayWALManager.registerPeer(peerId);
} catch (ReplicationException e) {
LOG.warn("Failed to register peer to worker storage for peer id={}, retry", peerId, e);
throw new ProcedureYieldException();
}
syncReplicationReplayWALManager.registerPeer(peerId);
setNextState(RecoverStandbyState.DISPATCH_WALS);
return Flow.HAS_MORE_STATE;
case DISPATCH_WALS:
@ -79,13 +76,7 @@ public class RecoverStandbyProcedure extends AbstractPeerNoLockProcedure<Recover
setNextState(RecoverStandbyState.UNREGISTER_PEER_FROM_WORKER_STORAGE);
return Flow.HAS_MORE_STATE;
case UNREGISTER_PEER_FROM_WORKER_STORAGE:
try {
syncReplicationReplayWALManager.unregisterPeer(peerId);
} catch (ReplicationException e) {
LOG.warn("Failed to unregister peer from worker storage for peer id={}, retry", peerId,
e);
throw new ProcedureYieldException();
}
syncReplicationReplayWALManager.unregisterPeer(peerId);
setNextState(RecoverStandbyState.SNAPSHOT_SYNC_REPLICATION_WALS_DIR);
return Flow.HAS_MORE_STATE;
case SNAPSHOT_SYNC_REPLICATION_WALS_DIR:
@ -106,9 +97,10 @@ public class RecoverStandbyProcedure extends AbstractPeerNoLockProcedure<Recover
throws ProcedureYieldException {
try {
List<Path> wals = syncReplicationReplayWALManager.getReplayWALsAndCleanUpUnusedFiles(peerId);
addChildProcedure(wals.stream().map(wal -> new SyncReplicationReplayWALProcedure(peerId,
addChildProcedure(wals.stream()
.map(wal -> new SyncReplicationReplayWALProcedure(peerId,
Arrays.asList(syncReplicationReplayWALManager.removeWALRootPath(wal))))
.toArray(SyncReplicationReplayWALProcedure[]::new));
.toArray(SyncReplicationReplayWALProcedure[]::new));
} catch (IOException e) {
LOG.warn("Failed to get replay wals for peer id={}, , retry", peerId, e);
throw new ProcedureYieldException();
@ -147,4 +139,19 @@ public class RecoverStandbyProcedure extends AbstractPeerNoLockProcedure<Recover
RecoverStandbyStateData data = serializer.deserialize(RecoverStandbyStateData.class);
serial = data.getSerial();
}
@Override
protected void afterReplay(MasterProcedureEnv env) {
// For these two states, we need to register the peer to the replay manager, as the state are
// only kept in memory and will be lost after restarting. And in
// SyncReplicationReplayWALProcedure.afterReplay we will reconstruct the used workers.
switch (getCurrentState()) {
case DISPATCH_WALS:
case UNREGISTER_PEER_FROM_WORKER_STORAGE:
env.getMasterServices().getSyncReplicationReplayWALManager().registerPeer(peerId);
break;
default:
break;
}
}
}

View File

@ -17,25 +17,29 @@
*/
package org.apache.hadoop.hbase.master.replication;
import static org.apache.hadoop.hbase.replication.ReplicationUtils.REMOTE_WAL_REPLAY_SUFFIX;
import static org.apache.hadoop.hbase.replication.ReplicationUtils.getPeerRemoteWALDir;
import static org.apache.hadoop.hbase.replication.ReplicationUtils.getPeerReplayWALDir;
import static org.apache.hadoop.hbase.replication.ReplicationUtils.getPeerSnapshotWALDir;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.ServerListener;
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.util.FSUtils;
@ -45,13 +49,34 @@ import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
/**
* The manager for replaying remote wal.
* <p/>
* First, it will be used to balance the replay work across all the region servers. We will record
* the region servers which have already been used for replaying wal, and prevent sending new replay
* work to it, until the previous replay work has been done, where we will remove the region server
* from the used worker set. See the comment for {@code UsedReplayWorkersForPeer} for more details.
* <p/>
* Second, the logic for managing the remote wal directory is kept here. Before replaying the wals,
* we will rename the remote wal directory, the new name is called 'replay' directory, see
* {@link #renameToPeerReplayWALDir(String)}. This is used to prevent further writing of remote
* wals, which is very important for keeping consistency. And then we will start replaying all the
* wals, once a wal has been replayed, we will truncate the file, so that if there are crashes
* happen, we do not need to replay all the wals again, see {@link #finishReplayWAL(String)} and
* {@link #isReplayWALFinished(String)}. After replaying all the wals, we will rename the 'replay'
* directory, the new name is called 'snapshot' directory. In the directory, we will keep all the
* names for the wals being replayed, since all the files should have been truncated. When we
* transitting original the ACTIVE cluster to STANDBY later, and there are region server crashes, we
* will see the wals in this directory to determine whether a wal should be split and replayed or
* not. You can see the code in {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker} for more
* details.
*/
@InterfaceAudience.Private
public class SyncReplicationReplayWALManager {
private static final Logger LOG =
LoggerFactory.getLogger(SyncReplicationReplayWALManager.class);
private static final Logger LOG = LoggerFactory.getLogger(SyncReplicationReplayWALManager.class);
private final MasterServices services;
private final ServerManager serverManager;
private final FileSystem fs;
@ -59,67 +84,128 @@ public class SyncReplicationReplayWALManager {
private final Path remoteWALDir;
private final ZKSyncReplicationReplayWALWorkerStorage workerStorage;
/**
* This class is used to record the used workers(region servers) for a replication peer. For
* balancing the replaying remote wal job, we will only schedule one remote replay procedure each
* time. So when acquiring a worker, we will first get all the region servers for this cluster,
* and then filter out the used ones.
* <p/>
* The {@link ProcedureEvent} is used for notifying procedures that there are available workers
* now. We used to use sleeping and retrying before, but if the interval is too large, for
* example, exponential backoff, then it is not effective, but if the interval is too small, then
* we will flood the procedure wal.
* <p/>
* The states are only kept in memory, so when restarting, we need to reconstruct these
* information, using the information stored in related procedures. See the {@code afterReplay}
* method in {@link RecoverStandbyProcedure} and {@link SyncReplicationReplayWALProcedure} for
* more details.
*/
private static final class UsedReplayWorkersForPeer {
private final Map<String, Set<ServerName>> workers = new HashMap<>();
private final Set<ServerName> usedWorkers = new HashSet<ServerName>();
private final Object workerLock = new Object();
private final ProcedureEvent<?> event;
public UsedReplayWorkersForPeer(String peerId) {
this.event = new ProcedureEvent<>(peerId);
}
public void used(ServerName worker) {
usedWorkers.add(worker);
}
public Optional<ServerName> acquire(ServerManager serverManager) {
Optional<ServerName> worker = serverManager.getOnlineServers().keySet().stream()
.filter(server -> !usedWorkers.contains(server)).findAny();
worker.ifPresent(usedWorkers::add);
return worker;
}
public void release(ServerName worker) {
usedWorkers.remove(worker);
}
public void suspend(Procedure<?> proc) {
event.suspend();
event.suspendIfNotReady(proc);
}
public void wake(MasterProcedureScheduler scheduler) {
if (!event.isReady()) {
event.wake(scheduler);
}
}
}
private final ConcurrentMap<String, UsedReplayWorkersForPeer> usedWorkersByPeer =
new ConcurrentHashMap<>();
public SyncReplicationReplayWALManager(MasterServices services)
throws IOException, ReplicationException {
this.services = services;
this.serverManager = services.getServerManager();
this.fs = services.getMasterFileSystem().getWALFileSystem();
this.walRootDir = services.getMasterFileSystem().getWALRootDir();
this.remoteWALDir = new Path(this.walRootDir, ReplicationUtils.REMOTE_WAL_DIR_NAME);
this.workerStorage = new ZKSyncReplicationReplayWALWorkerStorage(services.getZooKeeper(),
services.getConfiguration());
checkReplayingWALDir();
}
MasterProcedureScheduler scheduler =
services.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler();
serverManager.registerListener(new ServerListener() {
private void checkReplayingWALDir() throws IOException, ReplicationException {
FileStatus[] files = fs.listStatus(remoteWALDir);
for (FileStatus file : files) {
String name = file.getPath().getName();
if (name.endsWith(REMOTE_WAL_REPLAY_SUFFIX)) {
String peerId = name.substring(0, name.length() - REMOTE_WAL_REPLAY_SUFFIX.length());
workers.put(peerId, workerStorage.getPeerWorkers(peerId));
@Override
public void serverAdded(ServerName serverName) {
for (UsedReplayWorkersForPeer usedWorkers : usedWorkersByPeer.values()) {
synchronized (usedWorkers) {
usedWorkers.wake(scheduler);
}
}
}
}
});
}
public void registerPeer(String peerId) throws ReplicationException {
workers.put(peerId, new HashSet<>());
workerStorage.addPeer(peerId);
public void registerPeer(String peerId) {
usedWorkersByPeer.put(peerId, new UsedReplayWorkersForPeer(peerId));
}
public void unregisterPeer(String peerId) throws ReplicationException {
workers.remove(peerId);
workerStorage.removePeer(peerId);
public void unregisterPeer(String peerId) {
usedWorkersByPeer.remove(peerId);
}
public ServerName getPeerWorker(String peerId) throws ReplicationException {
Optional<ServerName> worker = Optional.empty();
ServerName workerServer = null;
synchronized (workerLock) {
worker = services.getServerManager().getOnlineServers().keySet().stream()
.filter(server -> !workers.get(peerId).contains(server)).findFirst();
/**
* Get a worker for replaying remote wal for a give peer. If no worker available, i.e, all the
* region servers have been used by others, a {@link ProcedureSuspendedException} will be thrown
* to suspend the procedure. And it will be woken up later when there are available workers,
* either by others release a worker, or there is a new region server joins the cluster.
*/
public ServerName acquirePeerWorker(String peerId, Procedure<?> proc)
throws ProcedureSuspendedException {
UsedReplayWorkersForPeer usedWorkers = usedWorkersByPeer.get(peerId);
synchronized (usedWorkers) {
Optional<ServerName> worker = usedWorkers.acquire(serverManager);
if (worker.isPresent()) {
workerServer = worker.get();
workers.get(peerId).add(workerServer);
return worker.get();
}
// no worker available right now, suspend the procedure
usedWorkers.suspend(proc);
}
if (workerServer != null) {
workerStorage.addPeerWorker(peerId, workerServer);
}
return workerServer;
throw new ProcedureSuspendedException();
}
public void removePeerWorker(String peerId, ServerName worker) throws ReplicationException {
synchronized (workerLock) {
workers.get(peerId).remove(worker);
public void releasePeerWorker(String peerId, ServerName worker,
MasterProcedureScheduler scheduler) {
UsedReplayWorkersForPeer usedWorkers = usedWorkersByPeer.get(peerId);
synchronized (usedWorkers) {
usedWorkers.release(worker);
usedWorkers.wake(scheduler);
}
workerStorage.removePeerWorker(peerId, worker);
}
/**
* Will only be called when loading procedures, where we need to construct the used worker set for
* each peer.
*/
public void addUsedPeerWorker(String peerId, ServerName worker) {
usedWorkersByPeer.get(peerId).used(worker);
}
public void createPeerRemoteWALDir(String peerId) throws IOException {
Path peerRemoteWALDir = getPeerRemoteWALDir(remoteWALDir, peerId);
if (!fs.exists(peerRemoteWALDir) && !fs.mkdirs(peerRemoteWALDir)) {
@ -132,23 +218,23 @@ public class SyncReplicationReplayWALManager {
deleteDir(dst, peerId);
if (!fs.rename(src, dst)) {
throw new IOException(
"Failed to rename dir from " + src + " to " + dst + " for peer id=" + peerId);
"Failed to rename dir from " + src + " to " + dst + " for peer id=" + peerId);
}
LOG.info("Renamed dir from {} to {} for peer id={}", src, dst, peerId);
} else if (!fs.exists(dst)) {
throw new IOException(
"Want to rename from " + src + " to " + dst + ", but they both do not exist");
"Want to rename from " + src + " to " + dst + ", but they both do not exist");
}
}
public void renameToPeerReplayWALDir(String peerId) throws IOException {
rename(getPeerRemoteWALDir(remoteWALDir, peerId), getPeerReplayWALDir(remoteWALDir, peerId),
peerId);
peerId);
}
public void renameToPeerSnapshotWALDir(String peerId) throws IOException {
rename(getPeerReplayWALDir(remoteWALDir, peerId), getPeerSnapshotWALDir(remoteWALDir, peerId),
peerId);
peerId);
}
public List<Path> getReplayWALsAndCleanUpUnusedFiles(String peerId) throws IOException {
@ -158,7 +244,7 @@ public class SyncReplicationReplayWALManager {
Path src = status.getPath();
String srcName = src.getName();
String dstName =
srcName.substring(0, srcName.length() - ReplicationUtils.RENAME_WAL_SUFFIX.length());
srcName.substring(0, srcName.length() - ReplicationUtils.RENAME_WAL_SUFFIX.length());
FSUtils.renameFile(fs, src, new Path(src.getParent(), dstName));
}
List<Path> wals = new ArrayList<>();
@ -175,14 +261,6 @@ public class SyncReplicationReplayWALManager {
return wals;
}
public void snapshotPeerReplayWALDir(String peerId) throws IOException {
Path peerReplayWALDir = getPeerReplayWALDir(remoteWALDir, peerId);
if (fs.exists(peerReplayWALDir) && !fs.delete(peerReplayWALDir, true)) {
throw new IOException(
"Failed to remove replay wals dir " + peerReplayWALDir + " for peer id=" + peerId);
}
}
private void deleteDir(Path dir, String peerId) throws IOException {
if (!fs.delete(dir, true) && fs.exists(dir)) {
throw new IOException("Failed to remove dir " + dir + " for peer id=" + peerId);

View File

@ -18,28 +18,31 @@
package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SyncReplicationReplayWALState;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SyncReplicationReplayWALStateData;
/**
* The procedure for replaying a set of remote wals. It will get an available region server and
* schedule a {@link SyncReplicationReplayWALRemoteProcedure} to actually send the request to region
* server.
*/
@InterfaceAudience.Private
public class SyncReplicationReplayWALProcedure
extends AbstractPeerNoLockProcedure<SyncReplicationReplayWALState> {
private static final Logger LOG =
LoggerFactory.getLogger(SyncReplicationReplayWALProcedure.class);
LoggerFactory.getLogger(SyncReplicationReplayWALProcedure.class);
private ServerName worker = null;
@ -57,24 +60,10 @@ public class SyncReplicationReplayWALProcedure
protected Flow executeFromState(MasterProcedureEnv env, SyncReplicationReplayWALState state)
throws ProcedureSuspendedException {
SyncReplicationReplayWALManager syncReplicationReplayWALManager =
env.getMasterServices().getSyncReplicationReplayWALManager();
env.getMasterServices().getSyncReplicationReplayWALManager();
switch (state) {
case ASSIGN_WORKER:
try {
worker = syncReplicationReplayWALManager.getPeerWorker(peerId);
} catch (ReplicationException e) {
long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
LOG.warn("Failed to get worker to replay wals {} for peer id={}, sleep {} secs and retry",
wals, peerId, backoff / 1000, e);
throw suspend(backoff);
}
if (worker == null) {
long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
LOG.info("No worker to replay wals {} for peer id={}, sleep {} secs and retry", wals,
peerId, backoff / 1000);
throw suspend(backoff);
}
attempts = 0;
worker = syncReplicationReplayWALManager.acquirePeerWorker(peerId, this);
setNextState(SyncReplicationReplayWALState.DISPATCH_WALS_TO_WORKER);
return Flow.HAS_MORE_STATE;
case DISPATCH_WALS_TO_WORKER:
@ -88,19 +77,11 @@ public class SyncReplicationReplayWALProcedure
} catch (IOException e) {
long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
LOG.warn("Failed to check whether replay wals {} finished for peer id={}" +
", sleep {} secs and retry",
wals, peerId, backoff / 1000, e);
", sleep {} secs and retry", wals, peerId, backoff / 1000, e);
throw suspend(backoff);
}
try {
syncReplicationReplayWALManager.removePeerWorker(peerId, worker);
} catch (ReplicationException e) {
long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
LOG.warn("Failed to remove worker {} for peer id={}, sleep {} secs and retry", worker,
peerId, backoff / 1000, e);
throw suspend(backoff);
}
attempts = 0;
syncReplicationReplayWALManager.releasePeerWorker(peerId, worker,
env.getProcedureScheduler());
if (!finished) {
LOG.warn("Failed to replay wals {} for peer id={}, retry", wals, peerId);
setNextState(SyncReplicationReplayWALState.ASSIGN_WORKER);
@ -113,8 +94,7 @@ public class SyncReplicationReplayWALProcedure
}
@Override
protected void rollbackState(MasterProcedureEnv env,
SyncReplicationReplayWALState state)
protected void rollbackState(MasterProcedureEnv env, SyncReplicationReplayWALState state)
throws IOException, InterruptedException {
if (state == getInitialState()) {
return;
@ -128,8 +108,7 @@ public class SyncReplicationReplayWALProcedure
}
@Override
protected int getStateId(
SyncReplicationReplayWALState state) {
protected int getStateId(SyncReplicationReplayWALState state) {
return state.getNumber();
}
@ -139,26 +118,40 @@ public class SyncReplicationReplayWALProcedure
}
@Override
protected void serializeStateData(ProcedureStateSerializer serializer)
throws IOException {
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
super.serializeStateData(serializer);
SyncReplicationReplayWALStateData.Builder builder =
SyncReplicationReplayWALStateData.newBuilder();
builder.setPeerId(peerId);
wals.stream().forEach(builder::addWal);
SyncReplicationReplayWALStateData.newBuilder().setPeerId(peerId).addAllWal(wals);
if (worker != null) {
builder.setWorker(ProtobufUtil.toServerName(worker));
}
serializer.serialize(builder.build());
}
@Override
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
super.deserializeStateData(serializer);
SyncReplicationReplayWALStateData data =
serializer.deserialize(SyncReplicationReplayWALStateData.class);
serializer.deserialize(SyncReplicationReplayWALStateData.class);
peerId = data.getPeerId();
wals = new ArrayList<>();
data.getWalList().forEach(wals::add);
wals = data.getWalList();
if (data.hasWorker()) {
worker = ProtobufUtil.toServerName(data.getWorker());
}
}
@Override
public PeerOperationType getPeerOperationType() {
return PeerOperationType.SYNC_REPLICATION_REPLAY_WAL;
}
@Override
protected void afterReplay(MasterProcedureEnv env) {
// If the procedure is not finished and the worker is not null, we should add it to the used
// worker set, to prevent the worker being used by others.
if (worker != null && !isFinished()) {
env.getMasterServices().getSyncReplicationReplayWALManager().addUsedPeerWorker(peerId,
worker);
}
}
}

View File

@ -43,12 +43,15 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReplaySyncReplicationWALParameter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SyncReplicationReplayWALRemoteStateData;
/**
* A remote procedure which is used to send replaying remote wal work to region server.
*/
@InterfaceAudience.Private
public class SyncReplicationReplayWALRemoteProcedure extends Procedure<MasterProcedureEnv>
implements RemoteProcedure<MasterProcedureEnv, ServerName>, PeerProcedureInterface {
private static final Logger LOG =
LoggerFactory.getLogger(SyncReplicationReplayWALRemoteProcedure.class);
LoggerFactory.getLogger(SyncReplicationReplayWALRemoteProcedure.class);
private String peerId;
@ -75,11 +78,11 @@ public class SyncReplicationReplayWALRemoteProcedure extends Procedure<MasterPro
@Override
public RemoteOperation remoteCallBuild(MasterProcedureEnv env, ServerName remote) {
ReplaySyncReplicationWALParameter.Builder builder =
ReplaySyncReplicationWALParameter.newBuilder();
ReplaySyncReplicationWALParameter.newBuilder();
builder.setPeerId(peerId);
wals.stream().forEach(builder::addWal);
return new ServerOperation(this, getProcId(), ReplaySyncReplicationWALCallable.class,
builder.build().toByteArray());
builder.build().toByteArray());
}
@Override
@ -116,8 +119,8 @@ public class SyncReplicationReplayWALRemoteProcedure extends Procedure<MasterPro
}
/**
* Only truncate wals one by one when task succeed. The parent procedure will check the first
* wal length to know whether this task succeed.
* Only truncate wals one by one when task succeed. The parent procedure will check the first wal
* length to know whether this task succeed.
*/
private void truncateWALs(MasterProcedureEnv env) {
String firstWal = wals.get(0);
@ -159,10 +162,8 @@ public class SyncReplicationReplayWALRemoteProcedure extends Procedure<MasterPro
try {
env.getRemoteDispatcher().addOperationToNode(targetServer, this);
} catch (FailedRemoteDispatchException e) {
LOG.warn(
"Can not add remote operation for replay wals {} on {} for peer id={}, "
+ "this usually because the server is already dead",
wals, targetServer, peerId);
LOG.warn("Can not add remote operation for replay wals {} on {} for peer id={}, " +
"this usually because the server is already dead", wals, targetServer, peerId);
// Return directly and the parent procedure will assign a new worker to replay wals
return null;
}
@ -183,11 +184,10 @@ public class SyncReplicationReplayWALRemoteProcedure extends Procedure<MasterPro
}
@Override
protected void serializeStateData(ProcedureStateSerializer serializer)
throws IOException {
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
SyncReplicationReplayWALRemoteStateData.Builder builder =
SyncReplicationReplayWALRemoteStateData.newBuilder().setPeerId(peerId)
.setTargetServer(ProtobufUtil.toServerName(targetServer));
SyncReplicationReplayWALRemoteStateData.newBuilder().setPeerId(peerId)
.setTargetServer(ProtobufUtil.toServerName(targetServer));
wals.stream().forEach(builder::addWal);
serializer.serialize(builder.build());
}
@ -195,7 +195,7 @@ public class SyncReplicationReplayWALRemoteProcedure extends Procedure<MasterPro
@Override
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
SyncReplicationReplayWALRemoteStateData data =
serializer.deserialize(SyncReplicationReplayWALRemoteStateData.class);
serializer.deserialize(SyncReplicationReplayWALRemoteStateData.class);
peerId = data.getPeerId();
wals = new ArrayList<>();
data.getWalList().forEach(wals::add);

View File

@ -1,108 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.replication;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ZKReplicationStorageBase;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
@InterfaceAudience.Private
public class ZKSyncReplicationReplayWALWorkerStorage extends ZKReplicationStorageBase {
public static final String WORKERS_ZNODE = "zookeeper.znode.sync.replication.replaywal.workers";
public static final String WORKERS_ZNODE_DEFAULT = "replaywal-workers";
/**
* The name of the znode that contains a list of workers to replay wal.
*/
private final String workersZNode;
public ZKSyncReplicationReplayWALWorkerStorage(ZKWatcher zookeeper, Configuration conf) {
super(zookeeper, conf);
String workersZNodeName = conf.get(WORKERS_ZNODE, WORKERS_ZNODE_DEFAULT);
workersZNode = ZNodePaths.joinZNode(replicationZNode, workersZNodeName);
}
private String getPeerNode(String peerId) {
return ZNodePaths.joinZNode(workersZNode, peerId);
}
public void addPeer(String peerId) throws ReplicationException {
try {
ZKUtil.createWithParents(zookeeper, getPeerNode(peerId));
} catch (KeeperException e) {
throw new ReplicationException(
"Failed to add peer id=" + peerId + " to replaywal-workers storage", e);
}
}
public void removePeer(String peerId) throws ReplicationException {
try {
ZKUtil.deleteNodeRecursively(zookeeper, getPeerNode(peerId));
} catch (KeeperException e) {
throw new ReplicationException(
"Failed to remove peer id=" + peerId + " to replaywal-workers storage", e);
}
}
private String getPeerWorkerNode(String peerId, ServerName worker) {
return ZNodePaths.joinZNode(getPeerNode(peerId), worker.getServerName());
}
public void addPeerWorker(String peerId, ServerName worker) throws ReplicationException {
try {
ZKUtil.createWithParents(zookeeper, getPeerWorkerNode(peerId, worker));
} catch (KeeperException e) {
throw new ReplicationException("Failed to add worker=" + worker + " for peer id=" + peerId,
e);
}
}
public void removePeerWorker(String peerId, ServerName worker) throws ReplicationException {
try {
ZKUtil.deleteNode(zookeeper, getPeerWorkerNode(peerId, worker));
} catch (KeeperException e) {
throw new ReplicationException("Failed to remove worker=" + worker + " for peer id=" + peerId,
e);
}
}
public Set<ServerName> getPeerWorkers(String peerId) throws ReplicationException {
try {
List<String> children = ZKUtil.listChildrenNoWatch(zookeeper, getPeerNode(peerId));
if (children == null) {
return new HashSet<>();
}
return children.stream().map(ServerName::valueOf).collect(Collectors.toSet());
} catch (KeeperException e) {
throw new ReplicationException("Failed to list workers for peer id=" + peerId, e);
}
}
}

View File

@ -0,0 +1,179 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.replication;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.ServerMetrics;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.ServerListener;
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@Category({ MasterTests.class, SmallTests.class })
public class TestSyncReplicationReplayWALManager {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestSyncReplicationReplayWALManager.class);
private static HBaseTestingUtility UTIL = new HBaseTestingUtility();
private SyncReplicationReplayWALManager manager;
private MasterProcedureScheduler scheduler;
private Set<ServerName> onlineServers;
private List<ServerListener> listeners;
private Queue<Procedure<?>> wokenProcedures;
@Before
public void setUp() throws IOException, ReplicationException {
wokenProcedures = new ArrayDeque<>();
onlineServers = new HashSet<>();
listeners = new ArrayList<>();
ServerManager serverManager = mock(ServerManager.class);
doAnswer(inv -> listeners.add(inv.getArgument(0))).when(serverManager)
.registerListener(any(ServerListener.class));
ServerMetrics serverMetrics = mock(ServerMetrics.class);
doAnswer(inv -> onlineServers.stream()
.collect(Collectors.toMap(Function.identity(), k -> serverMetrics))).when(serverManager)
.getOnlineServers();
MasterFileSystem mfs = mock(MasterFileSystem.class);
when(mfs.getFileSystem()).thenReturn(UTIL.getTestFileSystem());
when(mfs.getWALRootDir()).thenReturn(new Path("/"));
scheduler = mock(MasterProcedureScheduler.class);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
ProcedureEvent<?> event = ((ProcedureEvent<?>[]) invocation.getArgument(0))[0];
event.wakeInternal(new MasterProcedureScheduler(pid -> null) {
@Override
public void addFront(Iterator<Procedure> procedureIterator) {
procedureIterator.forEachRemaining(wokenProcedures::add);
}
});
return null;
}
}).when(scheduler).wakeEvents(any(ProcedureEvent[].class));
MasterProcedureEnv env = mock(MasterProcedureEnv.class);
when(env.getProcedureScheduler()).thenReturn(scheduler);
ProcedureExecutor<MasterProcedureEnv> procExec = mock(ProcedureExecutor.class);
when(procExec.getEnvironment()).thenReturn(env);
MasterServices services = mock(MasterServices.class);
when(services.getServerManager()).thenReturn(serverManager);
when(services.getMasterFileSystem()).thenReturn(mfs);
when(services.getMasterProcedureExecutor()).thenReturn(procExec);
manager = new SyncReplicationReplayWALManager(services);
assertEquals(1, listeners.size());
}
@Test
public void testUsedWorkers() throws ProcedureSuspendedException {
String peerId1 = "1";
String peerId2 = "2";
ServerName sn1 = ServerName.valueOf("host1", 123, 12345);
ServerName sn2 = ServerName.valueOf("host2", 234, 23456);
ServerName sn3 = ServerName.valueOf("host3", 345, 34567);
onlineServers.add(sn1);
manager.registerPeer(peerId1);
manager.registerPeer(peerId2);
// confirm that different peer ids does not affect each other
assertEquals(sn1, manager.acquirePeerWorker(peerId1, new NoopProcedure<>()));
assertEquals(sn1, manager.acquirePeerWorker(peerId2, new NoopProcedure<>()));
onlineServers.add(sn2);
assertEquals(sn2, manager.acquirePeerWorker(peerId1, new NoopProcedure<>()));
assertEquals(sn2, manager.acquirePeerWorker(peerId2, new NoopProcedure<>()));
NoopProcedure<?> proc = new NoopProcedure<>();
try {
manager.acquirePeerWorker(peerId1, proc);
fail("Should suspend");
} catch (ProcedureSuspendedException e) {
// expected
}
manager.releasePeerWorker(peerId1, sn1, scheduler);
assertEquals(1, wokenProcedures.size());
assertSame(proc, wokenProcedures.poll());
assertEquals(sn1, manager.acquirePeerWorker(peerId1, new NoopProcedure<>()));
NoopProcedure<?> proc1 = new NoopProcedure<>();
NoopProcedure<?> proc2 = new NoopProcedure<>();
try {
manager.acquirePeerWorker(peerId1, proc1);
fail("Should suspend");
} catch (ProcedureSuspendedException e) {
// expected
}
try {
manager.acquirePeerWorker(peerId1, proc2);
fail("Should suspend");
} catch (ProcedureSuspendedException e) {
// expected
}
listeners.get(0).serverAdded(sn3);
assertEquals(2, wokenProcedures.size());
assertSame(proc2, wokenProcedures.poll());
assertSame(proc1, wokenProcedures.poll());
}
}

View File

@ -0,0 +1,94 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
import java.io.UncheckedIOException;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureTestingUtility;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.replication.SyncReplicationTestBase;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ MasterTests.class, LargeTests.class })
public class TestTransitPeerSyncReplicationStateProcedureRetry extends SyncReplicationTestBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestTransitPeerSyncReplicationStateProcedureRetry.class);
@BeforeClass
public static void setUp() throws Exception {
UTIL2.getConfiguration().setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1);
SyncReplicationTestBase.setUp();
}
@Test
public void testRecoveryAndDoubleExecution() throws Exception {
UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
SyncReplicationState.STANDBY);
UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
SyncReplicationState.ACTIVE);
UTIL1.getAdmin().disableReplicationPeer(PEER_ID);
write(UTIL1, 0, 100);
Thread.sleep(2000);
// peer is disabled so no data have been replicated
verifyNotReplicatedThroughRegion(UTIL2, 0, 100);
// transit the A to DA first to avoid too many error logs.
UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
SyncReplicationState.DOWNGRADE_ACTIVE);
HMaster master = UTIL2.getHBaseCluster().getMaster();
ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor();
// Enable test flags and then queue the procedure.
ProcedureTestingUtility.waitNoProcedureRunning(procExec);
ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true);
Thread t = new Thread() {
@Override
public void run() {
try {
UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
SyncReplicationState.DOWNGRADE_ACTIVE);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
};
t.start();
UTIL2.waitFor(30000, () -> procExec.getProcedures().stream()
.anyMatch(p -> p instanceof TransitPeerSyncReplicationStateProcedure && !p.isFinished()));
long procId = procExec.getProcedures().stream()
.filter(p -> p instanceof TransitPeerSyncReplicationStateProcedure && !p.isFinished())
.mapToLong(Procedure::getProcId).min().getAsLong();
MasterProcedureTestingUtility.testRecoveryAndDoubleExecution(procExec, procId);
ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false);
}
}