HBASE-20569 NPE in RecoverStandbyProcedure.execute
This commit is contained in:
parent
7448b045cc
commit
44ca13fe07
|
@ -486,22 +486,34 @@ message TransitPeerSyncReplicationStateStateData {
|
|||
|
||||
enum RecoverStandbyState {
|
||||
RENAME_SYNC_REPLICATION_WALS_DIR = 1;
|
||||
INIT_WORKERS = 2;
|
||||
DISPATCH_TASKS = 3;
|
||||
SNAPSHOT_SYNC_REPLICATION_WALS_DIR = 4;
|
||||
REGISTER_PEER_TO_WORKER_STORAGE = 2;
|
||||
DISPATCH_WALS = 3;
|
||||
UNREGISTER_PEER_FROM_WORKER_STORAGE = 4;
|
||||
SNAPSHOT_SYNC_REPLICATION_WALS_DIR = 5;
|
||||
}
|
||||
|
||||
enum SyncReplicationReplayWALState {
|
||||
ASSIGN_WORKER = 1;
|
||||
DISPATCH_WALS_TO_WORKER = 2;
|
||||
RELEASE_WORKER = 3;
|
||||
}
|
||||
|
||||
message RecoverStandbyStateData {
|
||||
required string peer_id = 1;
|
||||
required bool serial = 1;
|
||||
}
|
||||
|
||||
message ReplaySyncReplicationWALStateData {
|
||||
message SyncReplicationReplayWALStateData {
|
||||
required string peer_id = 1;
|
||||
required string wal = 2;
|
||||
optional ServerName target_server = 3;
|
||||
repeated string wal = 2;
|
||||
}
|
||||
|
||||
message SyncReplicationReplayWALRemoteStateData {
|
||||
required string peer_id = 1;
|
||||
repeated string wal = 2;
|
||||
required ServerName target_server = 3;
|
||||
}
|
||||
|
||||
message ReplaySyncReplicationWALParameter {
|
||||
required string peer_id = 1;
|
||||
required string wal = 2;
|
||||
repeated string wal = 2;
|
||||
}
|
||||
|
|
|
@ -138,8 +138,8 @@ import org.apache.hadoop.hbase.master.replication.AddPeerProcedure;
|
|||
import org.apache.hadoop.hbase.master.replication.DisablePeerProcedure;
|
||||
import org.apache.hadoop.hbase.master.replication.EnablePeerProcedure;
|
||||
import org.apache.hadoop.hbase.master.replication.RemovePeerProcedure;
|
||||
import org.apache.hadoop.hbase.master.replication.ReplaySyncReplicationWALManager;
|
||||
import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
|
||||
import org.apache.hadoop.hbase.master.replication.SyncReplicationReplayWALManager;
|
||||
import org.apache.hadoop.hbase.master.replication.TransitPeerSyncReplicationStateProcedure;
|
||||
import org.apache.hadoop.hbase.master.replication.UpdatePeerConfigProcedure;
|
||||
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
|
||||
|
@ -343,7 +343,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
// manager of replication
|
||||
private ReplicationPeerManager replicationPeerManager;
|
||||
|
||||
private ReplaySyncReplicationWALManager replaySyncReplicationWALManager;
|
||||
private SyncReplicationReplayWALManager syncReplicationReplayWALManager;
|
||||
|
||||
// buffer for "fatal error" notices from region servers
|
||||
// in the cluster. This is only used for assisting
|
||||
|
@ -754,6 +754,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
this.splitOrMergeTracker.start();
|
||||
|
||||
this.replicationPeerManager = ReplicationPeerManager.create(zooKeeper, conf);
|
||||
this.syncReplicationReplayWALManager = new SyncReplicationReplayWALManager(this);
|
||||
|
||||
this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this, this.serverManager);
|
||||
this.drainingServerTracker.start();
|
||||
|
@ -852,7 +853,6 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
initializeMemStoreChunkCreator();
|
||||
this.fileSystemManager = new MasterFileSystem(conf);
|
||||
this.walManager = new MasterWalManager(this);
|
||||
this.replaySyncReplicationWALManager = new ReplaySyncReplicationWALManager(this);
|
||||
|
||||
// enable table descriptors cache
|
||||
this.tableDescriptors.setCacheOn();
|
||||
|
@ -3764,7 +3764,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
}
|
||||
|
||||
@Override
|
||||
public ReplaySyncReplicationWALManager getReplaySyncReplicationWALManager() {
|
||||
return this.replaySyncReplicationWALManager;
|
||||
public SyncReplicationReplayWALManager getSyncReplicationReplayWALManager() {
|
||||
return this.syncReplicationReplayWALManager;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -38,8 +38,8 @@ import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
|
|||
import org.apache.hadoop.hbase.master.locking.LockManager;
|
||||
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.hadoop.hbase.master.replication.ReplaySyncReplicationWALManager;
|
||||
import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
|
||||
import org.apache.hadoop.hbase.master.replication.SyncReplicationReplayWALManager;
|
||||
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
|
||||
import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
|
||||
import org.apache.hadoop.hbase.procedure2.LockedResource;
|
||||
|
@ -462,9 +462,9 @@ public interface MasterServices extends Server {
|
|||
ReplicationPeerManager getReplicationPeerManager();
|
||||
|
||||
/**
|
||||
* Returns the {@link ReplaySyncReplicationWALManager}.
|
||||
* Returns the {@link SyncReplicationReplayWALManager}.
|
||||
*/
|
||||
ReplaySyncReplicationWALManager getReplaySyncReplicationWALManager();
|
||||
SyncReplicationReplayWALManager getSyncReplicationReplayWALManager();
|
||||
|
||||
/**
|
||||
* Update the peerConfig for the specified peer
|
||||
|
|
|
@ -207,7 +207,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
// check if the next procedure is still a child.
|
||||
// if not, remove the rq from the fairq and go back to the xlock state
|
||||
Procedure<?> nextProc = rq.peek();
|
||||
if (nextProc != null && !Procedure.haveSameParent(nextProc, pollResult)) {
|
||||
if (nextProc != null && !Procedure.haveSameParent(nextProc, pollResult)
|
||||
&& nextProc.getRootProcId() != pollResult.getRootProcId()) {
|
||||
removeFromRunQueue(fairq, rq);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,7 +24,7 @@ public interface PeerProcedureInterface {
|
|||
|
||||
enum PeerOperationType {
|
||||
ADD, REMOVE, ENABLE, DISABLE, UPDATE_CONFIG, REFRESH, TRANSIT_SYNC_REPLICATION_STATE,
|
||||
RECOVER_STANDBY, REPLAY_SYNC_REPLICATION_WAL
|
||||
RECOVER_STANDBY, SYNC_REPLICATION_REPLAY_WAL, SYNC_REPLICATION_REPLAY_WAL_REMOTE
|
||||
}
|
||||
|
||||
String getPeerId();
|
||||
|
|
|
@ -50,6 +50,7 @@ class PeerQueue extends Queue<String> {
|
|||
|
||||
private static boolean requirePeerExclusiveLock(PeerProcedureInterface proc) {
|
||||
return proc.getPeerOperationType() != PeerOperationType.REFRESH
|
||||
&& proc.getPeerOperationType() != PeerOperationType.REPLAY_SYNC_REPLICATION_WAL;
|
||||
&& proc.getPeerOperationType() != PeerOperationType.SYNC_REPLICATION_REPLAY_WAL
|
||||
&& proc.getPeerOperationType() != PeerOperationType.SYNC_REPLICATION_REPLAY_WAL_REMOTE;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,60 +18,79 @@
|
|||
package org.apache.hadoop.hbase.master.replication;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RecoverStandbyState;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RecoverStandbyStateData;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public class RecoverStandbyProcedure extends AbstractPeerProcedure<RecoverStandbyState> {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(RecoverStandbyProcedure.class);
|
||||
|
||||
private boolean serial;
|
||||
|
||||
public RecoverStandbyProcedure() {
|
||||
}
|
||||
|
||||
public RecoverStandbyProcedure(String peerId) {
|
||||
public RecoverStandbyProcedure(String peerId, boolean serial) {
|
||||
super(peerId);
|
||||
this.serial = serial;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Flow executeFromState(MasterProcedureEnv env, RecoverStandbyState state)
|
||||
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
|
||||
ReplaySyncReplicationWALManager replaySyncReplicationWALManager =
|
||||
env.getMasterServices().getReplaySyncReplicationWALManager();
|
||||
SyncReplicationReplayWALManager syncReplicationReplayWALManager =
|
||||
env.getMasterServices().getSyncReplicationReplayWALManager();
|
||||
switch (state) {
|
||||
case RENAME_SYNC_REPLICATION_WALS_DIR:
|
||||
try {
|
||||
replaySyncReplicationWALManager.renameToPeerReplayWALDir(peerId);
|
||||
syncReplicationReplayWALManager.renameToPeerReplayWALDir(peerId);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to rename remote wal dir for peer id={}", peerId, e);
|
||||
setFailure("master-recover-standby", e);
|
||||
return Flow.NO_MORE_STATE;
|
||||
}
|
||||
setNextState(RecoverStandbyState.INIT_WORKERS);
|
||||
setNextState(RecoverStandbyState.REGISTER_PEER_TO_WORKER_STORAGE);
|
||||
return Flow.HAS_MORE_STATE;
|
||||
case INIT_WORKERS:
|
||||
replaySyncReplicationWALManager.initPeerWorkers(peerId);
|
||||
setNextState(RecoverStandbyState.DISPATCH_TASKS);
|
||||
case REGISTER_PEER_TO_WORKER_STORAGE:
|
||||
try {
|
||||
syncReplicationReplayWALManager.registerPeer(peerId);
|
||||
} catch (ReplicationException e) {
|
||||
LOG.warn("Failed to register peer to worker storage for peer id={}, retry", peerId, e);
|
||||
throw new ProcedureYieldException();
|
||||
}
|
||||
setNextState(RecoverStandbyState.DISPATCH_WALS);
|
||||
return Flow.HAS_MORE_STATE;
|
||||
case DISPATCH_TASKS:
|
||||
addChildProcedure(getReplayWALs(replaySyncReplicationWALManager).stream()
|
||||
.map(wal -> new ReplaySyncReplicationWALProcedure(peerId,
|
||||
replaySyncReplicationWALManager.removeWALRootPath(wal)))
|
||||
.toArray(ReplaySyncReplicationWALProcedure[]::new));
|
||||
case DISPATCH_WALS:
|
||||
dispathWals(syncReplicationReplayWALManager);
|
||||
setNextState(RecoverStandbyState.UNREGISTER_PEER_FROM_WORKER_STORAGE);
|
||||
return Flow.HAS_MORE_STATE;
|
||||
case UNREGISTER_PEER_FROM_WORKER_STORAGE:
|
||||
try {
|
||||
syncReplicationReplayWALManager.unregisterPeer(peerId);
|
||||
} catch (ReplicationException e) {
|
||||
LOG.warn("Failed to unregister peer from worker storage for peer id={}, retry", peerId,
|
||||
e);
|
||||
throw new ProcedureYieldException();
|
||||
}
|
||||
setNextState(RecoverStandbyState.SNAPSHOT_SYNC_REPLICATION_WALS_DIR);
|
||||
return Flow.HAS_MORE_STATE;
|
||||
case SNAPSHOT_SYNC_REPLICATION_WALS_DIR:
|
||||
try {
|
||||
replaySyncReplicationWALManager.renameToPeerSnapshotWALDir(peerId);
|
||||
syncReplicationReplayWALManager.renameToPeerSnapshotWALDir(peerId);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to cleanup replay wals dir for peer id={}, , retry", peerId, e);
|
||||
throw new ProcedureYieldException();
|
||||
|
@ -82,10 +101,14 @@ public class RecoverStandbyProcedure extends AbstractPeerProcedure<RecoverStandb
|
|||
}
|
||||
}
|
||||
|
||||
private List<Path> getReplayWALs(ReplaySyncReplicationWALManager replaySyncReplicationWALManager)
|
||||
// TODO: dispatch wals by region server when serial is true and sort wals
|
||||
private void dispathWals(SyncReplicationReplayWALManager syncReplicationReplayWALManager)
|
||||
throws ProcedureYieldException {
|
||||
try {
|
||||
return replaySyncReplicationWALManager.getReplayWALsAndCleanUpUnusedFiles(peerId);
|
||||
List<Path> wals = syncReplicationReplayWALManager.getReplayWALsAndCleanUpUnusedFiles(peerId);
|
||||
addChildProcedure(wals.stream().map(wal -> new SyncReplicationReplayWALProcedure(peerId,
|
||||
Arrays.asList(syncReplicationReplayWALManager.removeWALRootPath(wal))))
|
||||
.toArray(SyncReplicationReplayWALProcedure[]::new));
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to get replay wals for peer id={}, , retry", peerId, e);
|
||||
throw new ProcedureYieldException();
|
||||
|
@ -111,4 +134,17 @@ public class RecoverStandbyProcedure extends AbstractPeerProcedure<RecoverStandb
|
|||
public PeerOperationType getPeerOperationType() {
|
||||
return PeerOperationType.RECOVER_STANDBY;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
||||
super.serializeStateData(serializer);
|
||||
serializer.serialize(RecoverStandbyStateData.newBuilder().setSerial(serial).build());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
||||
super.deserializeStateData(serializer);
|
||||
RecoverStandbyStateData data = serializer.deserialize(RecoverStandbyStateData.class);
|
||||
serial = data.getSerial();
|
||||
}
|
||||
}
|
|
@ -67,11 +67,10 @@ public class RemovePeerProcedure extends ModifyPeerProcedure {
|
|||
}
|
||||
|
||||
private void removeRemoteWALs(MasterProcedureEnv env) throws IOException {
|
||||
env.getMasterServices().getReplaySyncReplicationWALManager().removePeerRemoteWALs(peerId);
|
||||
env.getMasterServices().getSyncReplicationReplayWALManager().removePeerRemoteWALs(peerId);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void postPeerModification(MasterProcedureEnv env)
|
||||
@Override protected void postPeerModification(MasterProcedureEnv env)
|
||||
throws IOException, ReplicationException {
|
||||
if (peerConfig.isSyncReplication()) {
|
||||
removeRemoteWALs(env);
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.master.replication;
|
||||
|
||||
import static org.apache.hadoop.hbase.replication.ReplicationUtils.REMOTE_WAL_REPLAY_SUFFIX;
|
||||
import static org.apache.hadoop.hbase.replication.ReplicationUtils.getPeerRemoteWALDir;
|
||||
import static org.apache.hadoop.hbase.replication.ReplicationUtils.getPeerReplayWALDir;
|
||||
import static org.apache.hadoop.hbase.replication.ReplicationUtils.getPeerSnapshotWALDir;
|
||||
|
@ -24,16 +25,18 @@ import static org.apache.hadoop.hbase.replication.ReplicationUtils.getPeerSnapsh
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationUtils;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
@ -43,10 +46,10 @@ import org.slf4j.LoggerFactory;
|
|||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public class ReplaySyncReplicationWALManager {
|
||||
public class SyncReplicationReplayWALManager {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(ReplaySyncReplicationWALManager.class);
|
||||
LoggerFactory.getLogger(SyncReplicationReplayWALManager.class);
|
||||
|
||||
private final MasterServices services;
|
||||
|
||||
|
@ -56,15 +59,67 @@ public class ReplaySyncReplicationWALManager {
|
|||
|
||||
private final Path remoteWALDir;
|
||||
|
||||
private final Map<String, BlockingQueue<ServerName>> availServers = new HashMap<>();
|
||||
private final ZKSyncReplicationReplayWALWorkerStorage workerStorage;
|
||||
|
||||
public ReplaySyncReplicationWALManager(MasterServices services) {
|
||||
private final Map<String, Set<ServerName>> workers = new HashMap<>();
|
||||
|
||||
private final Object workerLock = new Object();
|
||||
|
||||
public SyncReplicationReplayWALManager(MasterServices services)
|
||||
throws IOException, ReplicationException {
|
||||
this.services = services;
|
||||
this.fs = services.getMasterFileSystem().getWALFileSystem();
|
||||
this.walRootDir = services.getMasterFileSystem().getWALRootDir();
|
||||
this.remoteWALDir = new Path(this.walRootDir, ReplicationUtils.REMOTE_WAL_DIR_NAME);
|
||||
this.workerStorage = new ZKSyncReplicationReplayWALWorkerStorage(services.getZooKeeper(),
|
||||
services.getConfiguration());
|
||||
checkReplayingWALDir();
|
||||
}
|
||||
|
||||
private void checkReplayingWALDir() throws IOException, ReplicationException {
|
||||
FileStatus[] files = fs.listStatus(remoteWALDir);
|
||||
for (FileStatus file : files) {
|
||||
String name = file.getPath().getName();
|
||||
if (name.endsWith(REMOTE_WAL_REPLAY_SUFFIX)) {
|
||||
String peerId = name.substring(0, name.length() - REMOTE_WAL_REPLAY_SUFFIX.length());
|
||||
workers.put(peerId, workerStorage.getPeerWorkers(peerId));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void registerPeer(String peerId) throws ReplicationException {
|
||||
workers.put(peerId, new HashSet<>());
|
||||
workerStorage.addPeer(peerId);
|
||||
}
|
||||
|
||||
public void unregisterPeer(String peerId) throws ReplicationException {
|
||||
workers.remove(peerId);
|
||||
workerStorage.removePeer(peerId);
|
||||
}
|
||||
|
||||
public ServerName getPeerWorker(String peerId) throws ReplicationException {
|
||||
Optional<ServerName> worker = Optional.empty();
|
||||
ServerName workerServer = null;
|
||||
synchronized (workerLock) {
|
||||
worker = services.getServerManager().getOnlineServers().keySet().stream()
|
||||
.filter(server -> !workers.get(peerId).contains(server)).findFirst();
|
||||
if (worker.isPresent()) {
|
||||
workerServer = worker.get();
|
||||
workers.get(peerId).add(workerServer);
|
||||
}
|
||||
}
|
||||
if (workerServer != null) {
|
||||
workerStorage.addPeerWorker(peerId, workerServer);
|
||||
}
|
||||
return workerServer;
|
||||
}
|
||||
|
||||
public void removePeerWorker(String peerId, ServerName worker) throws ReplicationException {
|
||||
synchronized (workerLock) {
|
||||
workers.get(peerId).remove(worker);
|
||||
}
|
||||
workerStorage.removePeerWorker(peerId, worker);
|
||||
}
|
||||
public void createPeerRemoteWALDir(String peerId) throws IOException {
|
||||
Path peerRemoteWALDir = getPeerRemoteWALDir(remoteWALDir, peerId);
|
||||
if (!fs.exists(peerRemoteWALDir) && !fs.mkdirs(peerRemoteWALDir)) {
|
||||
|
@ -77,23 +132,23 @@ public class ReplaySyncReplicationWALManager {
|
|||
deleteDir(dst, peerId);
|
||||
if (!fs.rename(src, dst)) {
|
||||
throw new IOException(
|
||||
"Failed to rename dir from " + src + " to " + dst + " for peer id=" + peerId);
|
||||
"Failed to rename dir from " + src + " to " + dst + " for peer id=" + peerId);
|
||||
}
|
||||
LOG.info("Renamed dir from {} to {} for peer id={}", src, dst, peerId);
|
||||
} else if (!fs.exists(dst)) {
|
||||
throw new IOException(
|
||||
"Want to rename from " + src + " to " + dst + ", but they both do not exist");
|
||||
"Want to rename from " + src + " to " + dst + ", but they both do not exist");
|
||||
}
|
||||
}
|
||||
|
||||
public void renameToPeerReplayWALDir(String peerId) throws IOException {
|
||||
rename(getPeerRemoteWALDir(remoteWALDir, peerId), getPeerReplayWALDir(remoteWALDir, peerId),
|
||||
peerId);
|
||||
peerId);
|
||||
}
|
||||
|
||||
public void renameToPeerSnapshotWALDir(String peerId) throws IOException {
|
||||
rename(getPeerReplayWALDir(remoteWALDir, peerId), getPeerSnapshotWALDir(remoteWALDir, peerId),
|
||||
peerId);
|
||||
peerId);
|
||||
}
|
||||
|
||||
public List<Path> getReplayWALsAndCleanUpUnusedFiles(String peerId) throws IOException {
|
||||
|
@ -103,7 +158,7 @@ public class ReplaySyncReplicationWALManager {
|
|||
Path src = status.getPath();
|
||||
String srcName = src.getName();
|
||||
String dstName =
|
||||
srcName.substring(0, srcName.length() - ReplicationUtils.RENAME_WAL_SUFFIX.length());
|
||||
srcName.substring(0, srcName.length() - ReplicationUtils.RENAME_WAL_SUFFIX.length());
|
||||
FSUtils.renameFile(fs, src, new Path(src.getParent(), dstName));
|
||||
}
|
||||
List<Path> wals = new ArrayList<>();
|
||||
|
@ -140,28 +195,22 @@ public class ReplaySyncReplicationWALManager {
|
|||
deleteDir(getPeerSnapshotWALDir(remoteWALDir, peerId), peerId);
|
||||
}
|
||||
|
||||
public void initPeerWorkers(String peerId) {
|
||||
BlockingQueue<ServerName> servers = new LinkedBlockingQueue<>();
|
||||
services.getServerManager().getOnlineServers().keySet()
|
||||
.forEach(server -> servers.offer(server));
|
||||
availServers.put(peerId, servers);
|
||||
}
|
||||
|
||||
public ServerName getAvailServer(String peerId, long timeout, TimeUnit unit)
|
||||
throws InterruptedException {
|
||||
return availServers.get(peerId).poll(timeout, unit);
|
||||
}
|
||||
|
||||
public void addAvailServer(String peerId, ServerName server) {
|
||||
availServers.get(peerId).offer(server);
|
||||
}
|
||||
|
||||
public String removeWALRootPath(Path path) {
|
||||
String pathStr = path.toString();
|
||||
// remove the "/" too.
|
||||
return pathStr.substring(walRootDir.toString().length() + 1);
|
||||
}
|
||||
|
||||
public void finishReplayWAL(String wal) throws IOException {
|
||||
Path walPath = new Path(walRootDir, wal);
|
||||
fs.truncate(walPath, 0);
|
||||
}
|
||||
|
||||
public boolean isReplayWALFinished(String wal) throws IOException {
|
||||
Path walPath = new Path(walRootDir, wal);
|
||||
return fs.getFileStatus(walPath).getLen() == 0;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public Path getRemoteWALDir() {
|
||||
return remoteWALDir;
|
|
@ -0,0 +1,164 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.replication;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
|
||||
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SyncReplicationReplayWALState;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SyncReplicationReplayWALStateData;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public class SyncReplicationReplayWALProcedure
|
||||
extends StateMachineProcedure<MasterProcedureEnv, SyncReplicationReplayWALState>
|
||||
implements PeerProcedureInterface {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(SyncReplicationReplayWALProcedure.class);
|
||||
|
||||
private String peerId;
|
||||
|
||||
private ServerName worker = null;
|
||||
|
||||
private List<String> wals;
|
||||
|
||||
public SyncReplicationReplayWALProcedure() {
|
||||
}
|
||||
|
||||
public SyncReplicationReplayWALProcedure(String peerId, List<String> wals) {
|
||||
this.peerId = peerId;
|
||||
this.wals = wals;
|
||||
}
|
||||
|
||||
@Override protected Flow executeFromState(MasterProcedureEnv env,
|
||||
SyncReplicationReplayWALState state)
|
||||
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
|
||||
SyncReplicationReplayWALManager syncReplicationReplayWALManager =
|
||||
env.getMasterServices().getSyncReplicationReplayWALManager();
|
||||
switch (state) {
|
||||
case ASSIGN_WORKER:
|
||||
try {
|
||||
worker = syncReplicationReplayWALManager.getPeerWorker(peerId);
|
||||
} catch (ReplicationException e) {
|
||||
LOG.info("Failed to get worker to replay wals {} for peer id={}, retry", wals, peerId);
|
||||
throw new ProcedureYieldException();
|
||||
}
|
||||
if (worker == null) {
|
||||
LOG.info("No worker to replay wals {} for peer id={}, retry", wals, peerId);
|
||||
setNextState(SyncReplicationReplayWALState.ASSIGN_WORKER);
|
||||
} else {
|
||||
setNextState(SyncReplicationReplayWALState.DISPATCH_WALS_TO_WORKER);
|
||||
}
|
||||
return Flow.HAS_MORE_STATE;
|
||||
case DISPATCH_WALS_TO_WORKER:
|
||||
addChildProcedure(new SyncReplicationReplayWALRemoteProcedure(peerId, wals, worker));
|
||||
setNextState(SyncReplicationReplayWALState.RELEASE_WORKER);
|
||||
return Flow.HAS_MORE_STATE;
|
||||
case RELEASE_WORKER:
|
||||
boolean finished = false;
|
||||
try {
|
||||
finished = syncReplicationReplayWALManager.isReplayWALFinished(wals.get(0));
|
||||
} catch (IOException e) {
|
||||
LOG.info("Failed to check whether replay wals {} finished for peer id={}", wals, peerId);
|
||||
throw new ProcedureYieldException();
|
||||
}
|
||||
try {
|
||||
syncReplicationReplayWALManager.removePeerWorker(peerId, worker);
|
||||
} catch (ReplicationException e) {
|
||||
LOG.info("Failed to remove worker for peer id={}, retry", peerId);
|
||||
throw new ProcedureYieldException();
|
||||
}
|
||||
if (!finished) {
|
||||
LOG.info("Failed to replay wals {} for peer id={}, retry", wals, peerId);
|
||||
setNextState(SyncReplicationReplayWALState.ASSIGN_WORKER);
|
||||
return Flow.HAS_MORE_STATE;
|
||||
}
|
||||
return Flow.NO_MORE_STATE;
|
||||
default:
|
||||
throw new UnsupportedOperationException("unhandled state=" + state);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void rollbackState(MasterProcedureEnv env,
|
||||
SyncReplicationReplayWALState state)
|
||||
throws IOException, InterruptedException {
|
||||
if (state == getInitialState()) {
|
||||
return;
|
||||
}
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected SyncReplicationReplayWALState getState(int state) {
|
||||
return SyncReplicationReplayWALState.forNumber(state);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int getStateId(
|
||||
SyncReplicationReplayWALState state) {
|
||||
return state.getNumber();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected SyncReplicationReplayWALState getInitialState() {
|
||||
return SyncReplicationReplayWALState.ASSIGN_WORKER;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serializeStateData(ProcedureStateSerializer serializer)
|
||||
throws IOException {
|
||||
SyncReplicationReplayWALStateData.Builder builder =
|
||||
SyncReplicationReplayWALStateData.newBuilder();
|
||||
builder.setPeerId(peerId);
|
||||
wals.stream().forEach(builder::addWal);
|
||||
serializer.serialize(builder.build());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
||||
SyncReplicationReplayWALStateData data =
|
||||
serializer.deserialize(SyncReplicationReplayWALStateData.class);
|
||||
peerId = data.getPeerId();
|
||||
wals = new ArrayList<>();
|
||||
data.getWalList().forEach(wals::add);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getPeerId() {
|
||||
return peerId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public PeerOperationType getPeerOperationType() {
|
||||
return PeerOperationType.SYNC_REPLICATION_REPLAY_WAL;
|
||||
}
|
||||
}
|
|
@ -18,7 +18,8 @@
|
|||
package org.apache.hadoop.hbase.master.replication;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
|
@ -40,22 +41,20 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReplaySyncReplicationWALParameter;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReplaySyncReplicationWALStateData;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SyncReplicationReplayWALRemoteStateData;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public class ReplaySyncReplicationWALProcedure extends Procedure<MasterProcedureEnv>
|
||||
public class SyncReplicationReplayWALRemoteProcedure extends Procedure<MasterProcedureEnv>
|
||||
implements RemoteProcedure<MasterProcedureEnv, ServerName>, PeerProcedureInterface {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(ReplaySyncReplicationWALProcedure.class);
|
||||
|
||||
private static final long DEFAULT_WAIT_AVAILABLE_SERVER_TIMEOUT = 10000;
|
||||
LoggerFactory.getLogger(SyncReplicationReplayWALRemoteProcedure.class);
|
||||
|
||||
private String peerId;
|
||||
|
||||
private ServerName targetServer = null;
|
||||
private ServerName targetServer;
|
||||
|
||||
private String wal;
|
||||
private List<String> wals;
|
||||
|
||||
private boolean dispatched;
|
||||
|
||||
|
@ -63,19 +62,24 @@ public class ReplaySyncReplicationWALProcedure extends Procedure<MasterProcedure
|
|||
|
||||
private boolean succ;
|
||||
|
||||
public ReplaySyncReplicationWALProcedure() {
|
||||
public SyncReplicationReplayWALRemoteProcedure() {
|
||||
}
|
||||
|
||||
public ReplaySyncReplicationWALProcedure(String peerId, String wal) {
|
||||
public SyncReplicationReplayWALRemoteProcedure(String peerId, List<String> wals,
|
||||
ServerName targetServer) {
|
||||
this.peerId = peerId;
|
||||
this.wal = wal;
|
||||
this.wals = wals;
|
||||
this.targetServer = targetServer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteOperation remoteCallBuild(MasterProcedureEnv env, ServerName remote) {
|
||||
ReplaySyncReplicationWALParameter.Builder builder =
|
||||
ReplaySyncReplicationWALParameter.newBuilder();
|
||||
builder.setPeerId(peerId);
|
||||
wals.stream().forEach(builder::addWal);
|
||||
return new ServerOperation(this, getProcId(), ReplaySyncReplicationWALCallable.class,
|
||||
ReplaySyncReplicationWALParameter.newBuilder().setPeerId(peerId).setWal(wal).build()
|
||||
.toByteArray());
|
||||
builder.build().toByteArray());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -99,22 +103,47 @@ public class ReplaySyncReplicationWALProcedure extends Procedure<MasterProcedure
|
|||
getProcId());
|
||||
return;
|
||||
}
|
||||
ReplaySyncReplicationWALManager replaySyncReplicationWALManager =
|
||||
env.getMasterServices().getReplaySyncReplicationWALManager();
|
||||
if (error != null) {
|
||||
LOG.warn("Replay sync replication wal {} on {} failed for peer id={}", wal, targetServer,
|
||||
peerId, error);
|
||||
LOG.warn("Replay wals {} on {} failed for peer id={}", wals, targetServer, peerId, error);
|
||||
this.succ = false;
|
||||
} else {
|
||||
LOG.warn("Replay sync replication wal {} on {} suceeded for peer id={}", wal, targetServer,
|
||||
peerId);
|
||||
truncateWALs(env);
|
||||
LOG.info("Replay wals {} on {} succeed for peer id={}", wals, targetServer, peerId);
|
||||
this.succ = true;
|
||||
replaySyncReplicationWALManager.addAvailServer(peerId, targetServer);
|
||||
}
|
||||
event.wake(env.getProcedureScheduler());
|
||||
event = null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Only truncate wals one by one when task succeed. The parent procedure will check the first
|
||||
* wal length to know whether this task succeed.
|
||||
*/
|
||||
private void truncateWALs(MasterProcedureEnv env) {
|
||||
String firstWal = wals.get(0);
|
||||
try {
|
||||
env.getMasterServices().getSyncReplicationReplayWALManager().finishReplayWAL(firstWal);
|
||||
} catch (IOException e) {
|
||||
// As it is idempotent to rerun this task. Just ignore this exception and return.
|
||||
LOG.warn("Failed to truncate wal {} for peer id={}", firstWal, peerId, e);
|
||||
return;
|
||||
}
|
||||
for (int i = 1; i < wals.size(); i++) {
|
||||
String wal = wals.get(i);
|
||||
try {
|
||||
env.getMasterServices().getSyncReplicationReplayWALManager().finishReplayWAL(wal);
|
||||
} catch (IOException e1) {
|
||||
try {
|
||||
// retry
|
||||
env.getMasterServices().getSyncReplicationReplayWALManager().finishReplayWAL(wal);
|
||||
} catch (IOException e2) {
|
||||
// As the parent procedure only check the first wal length. Just ignore this exception.
|
||||
LOG.warn("Failed to truncate wal {} for peer id={}", wal, peerId, e2);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
|
||||
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
|
||||
|
@ -126,25 +155,14 @@ public class ReplaySyncReplicationWALProcedure extends Procedure<MasterProcedure
|
|||
dispatched = false;
|
||||
}
|
||||
|
||||
// Try poll a available server
|
||||
if (targetServer == null) {
|
||||
targetServer = env.getMasterServices().getReplaySyncReplicationWALManager()
|
||||
.getAvailServer(peerId, DEFAULT_WAIT_AVAILABLE_SERVER_TIMEOUT, TimeUnit.MILLISECONDS);
|
||||
if (targetServer == null) {
|
||||
LOG.info("No available server to replay wal {} for peer id={}, retry", wal, peerId);
|
||||
throw new ProcedureYieldException();
|
||||
}
|
||||
}
|
||||
|
||||
// Dispatch task to target server
|
||||
try {
|
||||
env.getRemoteDispatcher().addOperationToNode(targetServer, this);
|
||||
} catch (FailedRemoteDispatchException e) {
|
||||
LOG.info(
|
||||
"Can not add remote operation for replay wal {} on {} for peer id={}, " +
|
||||
"this usually because the server is already dead, " + "retry",
|
||||
wal, targetServer, peerId, e);
|
||||
targetServer = null;
|
||||
LOG.warn(
|
||||
"Can not add remote operation for replay wals {} on {} for peer id={}, "
|
||||
+ "this usually because the server is already dead, retry",
|
||||
wals, targetServer, peerId);
|
||||
throw new ProcedureYieldException();
|
||||
}
|
||||
dispatched = true;
|
||||
|
@ -164,24 +182,23 @@ public class ReplaySyncReplicationWALProcedure extends Procedure<MasterProcedure
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
||||
ReplaySyncReplicationWALStateData.Builder builder =
|
||||
ReplaySyncReplicationWALStateData.newBuilder().setPeerId(peerId).setWal(wal);
|
||||
if (targetServer != null) {
|
||||
builder.setTargetServer(ProtobufUtil.toServerName(targetServer));
|
||||
}
|
||||
protected void serializeStateData(ProcedureStateSerializer serializer)
|
||||
throws IOException {
|
||||
SyncReplicationReplayWALRemoteStateData.Builder builder =
|
||||
SyncReplicationReplayWALRemoteStateData.newBuilder().setPeerId(peerId)
|
||||
.setTargetServer(ProtobufUtil.toServerName(targetServer));
|
||||
wals.stream().forEach(builder::addWal);
|
||||
serializer.serialize(builder.build());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
||||
ReplaySyncReplicationWALStateData data =
|
||||
serializer.deserialize(ReplaySyncReplicationWALStateData.class);
|
||||
SyncReplicationReplayWALRemoteStateData data =
|
||||
serializer.deserialize(SyncReplicationReplayWALRemoteStateData.class);
|
||||
peerId = data.getPeerId();
|
||||
wal = data.getWal();
|
||||
if (data.hasTargetServer()) {
|
||||
targetServer = ProtobufUtil.toServerName(data.getTargetServer());
|
||||
}
|
||||
wals = new ArrayList<>();
|
||||
data.getWalList().forEach(wals::add);
|
||||
targetServer = ProtobufUtil.toServerName(data.getTargetServer());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -191,6 +208,6 @@ public class ReplaySyncReplicationWALProcedure extends Procedure<MasterProcedure
|
|||
|
||||
@Override
|
||||
public PeerOperationType getPeerOperationType() {
|
||||
return PeerOperationType.REPLAY_SYNC_REPLICATION_WAL;
|
||||
return PeerOperationType.SYNC_REPLICATION_REPLAY_WAL_REMOTE;
|
||||
}
|
||||
}
|
|
@ -186,8 +186,8 @@ public class TransitPeerSyncReplicationStateProcedure
|
|||
}
|
||||
}
|
||||
|
||||
private void replayRemoteWAL() {
|
||||
addChildProcedure(new RecoverStandbyProcedure(peerId));
|
||||
private void replayRemoteWAL(boolean serial) {
|
||||
addChildProcedure(new RecoverStandbyProcedure(peerId, serial));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -232,7 +232,7 @@ public class TransitPeerSyncReplicationStateProcedure
|
|||
setNextStateAfterRefreshBegin();
|
||||
return Flow.HAS_MORE_STATE;
|
||||
case REPLAY_REMOTE_WAL_IN_PEER:
|
||||
replayRemoteWAL();
|
||||
replayRemoteWAL(env.getReplicationPeerManager().getPeerConfig(peerId).get().isSerial());
|
||||
setNextState(
|
||||
PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE);
|
||||
return Flow.HAS_MORE_STATE;
|
||||
|
|
|
@ -0,0 +1,108 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.replication;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||
import org.apache.hadoop.hbase.replication.ZKReplicationStorageBase;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public class ZKSyncReplicationReplayWALWorkerStorage extends ZKReplicationStorageBase {
|
||||
|
||||
public static final String WORKERS_ZNODE = "zookeeper.znode.sync.replication.replaywal.workers";
|
||||
|
||||
public static final String WORKERS_ZNODE_DEFAULT = "replaywal-workers";
|
||||
|
||||
/**
|
||||
* The name of the znode that contains a list of workers to replay wal.
|
||||
*/
|
||||
private final String workersZNode;
|
||||
|
||||
public ZKSyncReplicationReplayWALWorkerStorage(ZKWatcher zookeeper, Configuration conf) {
|
||||
super(zookeeper, conf);
|
||||
String workersZNodeName = conf.get(WORKERS_ZNODE, WORKERS_ZNODE_DEFAULT);
|
||||
workersZNode = ZNodePaths.joinZNode(replicationZNode, workersZNodeName);
|
||||
}
|
||||
|
||||
private String getPeerNode(String peerId) {
|
||||
return ZNodePaths.joinZNode(workersZNode, peerId);
|
||||
}
|
||||
|
||||
public void addPeer(String peerId) throws ReplicationException {
|
||||
try {
|
||||
ZKUtil.createWithParents(zookeeper, getPeerNode(peerId));
|
||||
} catch (KeeperException e) {
|
||||
throw new ReplicationException(
|
||||
"Failed to add peer id=" + peerId + " to replaywal-workers storage", e);
|
||||
}
|
||||
}
|
||||
|
||||
public void removePeer(String peerId) throws ReplicationException {
|
||||
try {
|
||||
ZKUtil.deleteNodeRecursively(zookeeper, getPeerNode(peerId));
|
||||
} catch (KeeperException e) {
|
||||
throw new ReplicationException(
|
||||
"Failed to remove peer id=" + peerId + " to replaywal-workers storage", e);
|
||||
}
|
||||
}
|
||||
|
||||
private String getPeerWorkerNode(String peerId, ServerName worker) {
|
||||
return ZNodePaths.joinZNode(getPeerNode(peerId), worker.getServerName());
|
||||
}
|
||||
|
||||
public void addPeerWorker(String peerId, ServerName worker) throws ReplicationException {
|
||||
try {
|
||||
ZKUtil.createWithParents(zookeeper, getPeerWorkerNode(peerId, worker));
|
||||
} catch (KeeperException e) {
|
||||
throw new ReplicationException("Failed to add worker=" + worker + " for peer id=" + peerId,
|
||||
e);
|
||||
}
|
||||
}
|
||||
|
||||
public void removePeerWorker(String peerId, ServerName worker) throws ReplicationException {
|
||||
try {
|
||||
ZKUtil.deleteNode(zookeeper, getPeerWorkerNode(peerId, worker));
|
||||
} catch (KeeperException e) {
|
||||
throw new ReplicationException("Failed to remove worker=" + worker + " for peer id=" + peerId,
|
||||
e);
|
||||
}
|
||||
}
|
||||
|
||||
public Set<ServerName> getPeerWorkers(String peerId) throws ReplicationException {
|
||||
try {
|
||||
List<String> children = ZKUtil.listChildrenNoWatch(zookeeper, getPeerNode(peerId));
|
||||
if (children == null) {
|
||||
return new HashSet<>();
|
||||
}
|
||||
return children.stream().map(ServerName::valueOf).collect(Collectors.toSet());
|
||||
} catch (KeeperException e) {
|
||||
throw new ReplicationException("Failed to list workers for peer id=" + peerId, e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -21,6 +21,8 @@ import java.io.EOFException;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
@ -32,6 +34,7 @@ import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
|
|||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.KeyLocker;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Reader;
|
||||
|
@ -68,31 +71,28 @@ public class ReplaySyncReplicationWALCallable implements RSProcedureCallable {
|
|||
|
||||
private String peerId;
|
||||
|
||||
private String wal;
|
||||
private List<String> wals = new ArrayList<>();
|
||||
|
||||
private Exception initError;
|
||||
|
||||
private long batchSize;
|
||||
|
||||
private final KeyLocker<String> peersLock = new KeyLocker<>();
|
||||
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
if (initError != null) {
|
||||
throw initError;
|
||||
}
|
||||
LOG.info("Received a replay sync replication wal {} event, peerId={}", wal, peerId);
|
||||
LOG.info("Received a replay sync replication wals {} event, peerId={}", wals, peerId);
|
||||
if (rs.getReplicationSinkService() != null) {
|
||||
try (Reader reader = getReader()) {
|
||||
List<Entry> entries = readWALEntries(reader);
|
||||
while (!entries.isEmpty()) {
|
||||
Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> pair = ReplicationProtbufUtil
|
||||
.buildReplicateWALEntryRequest(entries.toArray(new Entry[entries.size()]));
|
||||
ReplicateWALEntryRequest request = pair.getFirst();
|
||||
rs.getReplicationSinkService().replicateLogEntries(request.getEntryList(),
|
||||
pair.getSecond(), request.getReplicationClusterId(),
|
||||
request.getSourceBaseNamespaceDirPath(), request.getSourceHFileArchiveDirPath());
|
||||
// Read next entries.
|
||||
entries = readWALEntries(reader);
|
||||
Lock peerLock = peersLock.acquireLock(wals.get(0));
|
||||
try {
|
||||
for (String wal : wals) {
|
||||
replayWAL(wal);
|
||||
}
|
||||
} finally {
|
||||
peerLock.unlock();
|
||||
}
|
||||
}
|
||||
return null;
|
||||
|
@ -107,7 +107,7 @@ public class ReplaySyncReplicationWALCallable implements RSProcedureCallable {
|
|||
ReplaySyncReplicationWALParameter param =
|
||||
ReplaySyncReplicationWALParameter.parseFrom(parameter);
|
||||
this.peerId = param.getPeerId();
|
||||
this.wal = param.getWal();
|
||||
param.getWalList().forEach(this.wals::add);
|
||||
this.batchSize = rs.getConfiguration().getLong(REPLAY_SYNC_REPLICATION_WAL_BATCH_SIZE,
|
||||
DEFAULT_REPLAY_SYNC_REPLICATION_WAL_BATCH_SIZE);
|
||||
} catch (InvalidProtocolBufferException e) {
|
||||
|
@ -120,7 +120,23 @@ public class ReplaySyncReplicationWALCallable implements RSProcedureCallable {
|
|||
return EventType.RS_REPLAY_SYNC_REPLICATION_WAL;
|
||||
}
|
||||
|
||||
private Reader getReader() throws IOException {
|
||||
private void replayWAL(String wal) throws IOException {
|
||||
try (Reader reader = getReader(wal)) {
|
||||
List<Entry> entries = readWALEntries(reader);
|
||||
while (!entries.isEmpty()) {
|
||||
Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> pair = ReplicationProtbufUtil
|
||||
.buildReplicateWALEntryRequest(entries.toArray(new Entry[entries.size()]));
|
||||
ReplicateWALEntryRequest request = pair.getFirst();
|
||||
rs.getReplicationSinkService().replicateLogEntries(request.getEntryList(),
|
||||
pair.getSecond(), request.getReplicationClusterId(),
|
||||
request.getSourceBaseNamespaceDirPath(), request.getSourceHFileArchiveDirPath());
|
||||
// Read next entries.
|
||||
entries = readWALEntries(reader);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private Reader getReader(String wal) throws IOException {
|
||||
Path path = new Path(rs.getWALRootDir(), wal);
|
||||
long length = rs.getWALFileSystem().getFileStatus(path).getLen();
|
||||
try {
|
||||
|
|
|
@ -41,8 +41,8 @@ import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
|
|||
import org.apache.hadoop.hbase.master.locking.LockManager;
|
||||
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.hadoop.hbase.master.replication.ReplaySyncReplicationWALManager;
|
||||
import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
|
||||
import org.apache.hadoop.hbase.master.replication.SyncReplicationReplayWALManager;
|
||||
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
|
||||
import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
|
||||
import org.apache.hadoop.hbase.procedure2.LockedResource;
|
||||
|
@ -476,7 +476,7 @@ public class MockNoopMasterServices implements MasterServices {
|
|||
}
|
||||
|
||||
@Override
|
||||
public ReplaySyncReplicationWALManager getReplaySyncReplicationWALManager() {
|
||||
public SyncReplicationReplayWALManager getSyncReplicationReplayWALManager() {
|
||||
return null;
|
||||
}
|
||||
}
|
|
@ -78,7 +78,7 @@ public class SyncReplicationTestBase {
|
|||
|
||||
protected static Path REMOTE_WAL_DIR2;
|
||||
|
||||
private static void initTestingUtility(HBaseTestingUtility util, String zkParent) {
|
||||
protected static void initTestingUtility(HBaseTestingUtility util, String zkParent) {
|
||||
util.setZkCluster(ZK_UTIL.getZkCluster());
|
||||
Configuration conf = util.getConfiguration();
|
||||
conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, zkParent);
|
||||
|
@ -102,8 +102,8 @@ public class SyncReplicationTestBase {
|
|||
ZK_UTIL.startMiniZKCluster();
|
||||
initTestingUtility(UTIL1, "/cluster1");
|
||||
initTestingUtility(UTIL2, "/cluster2");
|
||||
UTIL1.startMiniCluster(3);
|
||||
UTIL2.startMiniCluster(3);
|
||||
UTIL1.startMiniCluster(2,3);
|
||||
UTIL2.startMiniCluster(2,3);
|
||||
TableDescriptor td =
|
||||
TableDescriptorBuilder.newBuilder(TABLE_NAME).setColumnFamily(ColumnFamilyDescriptorBuilder
|
||||
.newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build();
|
||||
|
|
|
@ -0,0 +1,88 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.replication;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.master.MasterFileSystem;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@Category({ ReplicationTests.class, LargeTests.class })
|
||||
public class TestSyncReplicationStandbyKillMaster extends SyncReplicationTestBase {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(TestSyncReplicationStandbyKillMaster.class);
|
||||
|
||||
private final long SLEEP_TIME = 2000;
|
||||
|
||||
private final int COUNT = 1000;
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestSyncReplicationStandbyKillMaster.class);
|
||||
|
||||
@Test
|
||||
public void testStandbyKillMaster() throws Exception {
|
||||
MasterFileSystem mfs = UTIL2.getHBaseCluster().getMaster().getMasterFileSystem();
|
||||
Path remoteWALDir = getRemoteWALDir(mfs, PEER_ID);
|
||||
assertFalse(mfs.getWALFileSystem().exists(remoteWALDir));
|
||||
UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
|
||||
SyncReplicationState.STANDBY);
|
||||
assertTrue(mfs.getWALFileSystem().exists(remoteWALDir));
|
||||
UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
|
||||
SyncReplicationState.ACTIVE);
|
||||
|
||||
// Disable async replication and write data, then shutdown
|
||||
UTIL1.getAdmin().disableReplicationPeer(PEER_ID);
|
||||
write(UTIL1, 0, COUNT);
|
||||
UTIL1.shutdownMiniCluster();
|
||||
|
||||
Thread t = new Thread(() -> {
|
||||
try {
|
||||
Thread.sleep(SLEEP_TIME);
|
||||
UTIL2.getMiniHBaseCluster().getMaster().stop("Stop master for test");
|
||||
} catch (Exception e) {
|
||||
LOG.error("Failed to stop master", e);
|
||||
}
|
||||
});
|
||||
t.start();
|
||||
|
||||
// Transit standby to DA to replay logs
|
||||
try {
|
||||
UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
|
||||
SyncReplicationState.DOWNGRADE_ACTIVE);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Failed to transit standby cluster to " + SyncReplicationState.DOWNGRADE_ACTIVE);
|
||||
}
|
||||
|
||||
while (UTIL2.getAdmin().getReplicationPeerSyncReplicationState(PEER_ID)
|
||||
!= SyncReplicationState.DOWNGRADE_ACTIVE) {
|
||||
Thread.sleep(SLEEP_TIME);
|
||||
}
|
||||
verify(UTIL2, 0, COUNT);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,119 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.replication;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.master.MasterFileSystem;
|
||||
import org.apache.hadoop.hbase.master.ServerManager;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@Category({ ReplicationTests.class, LargeTests.class })
|
||||
public class TestSyncReplicationStandbyKillRS extends SyncReplicationTestBase {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(TestSyncReplicationStandbyKillRS.class);
|
||||
|
||||
private final long SLEEP_TIME = 1000;
|
||||
|
||||
private final int COUNT = 1000;
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestSyncReplicationStandbyKillRS.class);
|
||||
|
||||
@Test
|
||||
public void testStandbyKillRegionServer() throws Exception {
|
||||
MasterFileSystem mfs = UTIL2.getHBaseCluster().getMaster().getMasterFileSystem();
|
||||
Path remoteWALDir = getRemoteWALDir(mfs, PEER_ID);
|
||||
assertFalse(mfs.getWALFileSystem().exists(remoteWALDir));
|
||||
UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
|
||||
SyncReplicationState.STANDBY);
|
||||
assertTrue(mfs.getWALFileSystem().exists(remoteWALDir));
|
||||
UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
|
||||
SyncReplicationState.ACTIVE);
|
||||
|
||||
// Disable async replication and write data, then shutdown
|
||||
UTIL1.getAdmin().disableReplicationPeer(PEER_ID);
|
||||
write(UTIL1, 0, COUNT);
|
||||
UTIL1.shutdownMiniCluster();
|
||||
|
||||
JVMClusterUtil.MasterThread activeMaster = UTIL2.getMiniHBaseCluster().getMasterThread();
|
||||
Thread t = new Thread(() -> {
|
||||
try {
|
||||
List<JVMClusterUtil.RegionServerThread> regionServers =
|
||||
UTIL2.getMiniHBaseCluster().getLiveRegionServerThreads();
|
||||
for (JVMClusterUtil.RegionServerThread rst : regionServers) {
|
||||
ServerName serverName = rst.getRegionServer().getServerName();
|
||||
rst.getRegionServer().stop("Stop RS for test");
|
||||
waitForRSShutdownToStartAndFinish(activeMaster, serverName);
|
||||
JVMClusterUtil.RegionServerThread restarted =
|
||||
UTIL2.getMiniHBaseCluster().startRegionServer();
|
||||
restarted.waitForServerOnline();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.error("Failed to kill RS", e);
|
||||
}
|
||||
});
|
||||
t.start();
|
||||
|
||||
// Transit standby to DA to replay logs
|
||||
try {
|
||||
UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
|
||||
SyncReplicationState.DOWNGRADE_ACTIVE);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Failed to transit standby cluster to " + SyncReplicationState.DOWNGRADE_ACTIVE);
|
||||
}
|
||||
|
||||
while (UTIL2.getAdmin().getReplicationPeerSyncReplicationState(PEER_ID)
|
||||
!= SyncReplicationState.DOWNGRADE_ACTIVE) {
|
||||
Thread.sleep(SLEEP_TIME);
|
||||
}
|
||||
verify(UTIL2, 0, COUNT);
|
||||
}
|
||||
|
||||
private void waitForRSShutdownToStartAndFinish(JVMClusterUtil.MasterThread activeMaster,
|
||||
ServerName serverName) throws InterruptedException {
|
||||
ServerManager sm = activeMaster.getMaster().getServerManager();
|
||||
// First wait for it to be in dead list
|
||||
while (!sm.getDeadServers().isDeadServer(serverName)) {
|
||||
LOG.debug("Waiting for [" + serverName + "] to be listed as dead in master");
|
||||
Thread.sleep(SLEEP_TIME);
|
||||
}
|
||||
LOG.debug("Server [" + serverName + "] marked as dead, waiting for it to " +
|
||||
"finish dead processing");
|
||||
while (sm.areDeadServersInProgress()) {
|
||||
LOG.debug("Server [" + serverName + "] still being processed, waiting");
|
||||
Thread.sleep(SLEEP_TIME);
|
||||
}
|
||||
LOG.debug("Server [" + serverName + "] done with server shutdown processing");
|
||||
}
|
||||
}
|
|
@ -40,7 +40,7 @@ import org.apache.hadoop.hbase.client.Table;
|
|||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.hadoop.hbase.master.replication.RecoverStandbyProcedure;
|
||||
import org.apache.hadoop.hbase.master.replication.ReplaySyncReplicationWALManager;
|
||||
import org.apache.hadoop.hbase.master.replication.SyncReplicationReplayWALManager;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
|
||||
|
@ -92,7 +92,7 @@ public class TestRecoverStandbyProcedure {
|
|||
|
||||
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
|
||||
private static ReplaySyncReplicationWALManager replaySyncReplicationWALManager;
|
||||
private static SyncReplicationReplayWALManager syncReplicationReplayWALManager;
|
||||
|
||||
private static ProcedureExecutor<MasterProcedureEnv> procExec;
|
||||
|
||||
|
@ -107,7 +107,7 @@ public class TestRecoverStandbyProcedure {
|
|||
conf = UTIL.getConfiguration();
|
||||
HMaster master = UTIL.getHBaseCluster().getMaster();
|
||||
fs = master.getMasterFileSystem().getWALFileSystem();
|
||||
replaySyncReplicationWALManager = master.getReplaySyncReplicationWALManager();
|
||||
syncReplicationReplayWALManager = master.getSyncReplicationReplayWALManager();
|
||||
procExec = master.getMasterProcedureExecutor();
|
||||
}
|
||||
|
||||
|
@ -138,7 +138,7 @@ public class TestRecoverStandbyProcedure {
|
|||
@Test
|
||||
public void testRecoverStandby() throws IOException, StreamLacksCapabilityException {
|
||||
setupSyncReplicationWALs();
|
||||
long procId = procExec.submitProcedure(new RecoverStandbyProcedure(PEER_ID));
|
||||
long procId = procExec.submitProcedure(new RecoverStandbyProcedure(PEER_ID, false));
|
||||
ProcedureTestingUtility.waitProcedure(procExec, procId);
|
||||
ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
|
||||
|
||||
|
@ -153,7 +153,7 @@ public class TestRecoverStandbyProcedure {
|
|||
|
||||
private void setupSyncReplicationWALs() throws IOException, StreamLacksCapabilityException {
|
||||
Path peerRemoteWALDir = ReplicationUtils
|
||||
.getPeerRemoteWALDir(replaySyncReplicationWALManager.getRemoteWALDir(), PEER_ID);
|
||||
.getPeerRemoteWALDir(syncReplicationReplayWALManager.getRemoteWALDir(), PEER_ID);
|
||||
if (!fs.exists(peerRemoteWALDir)) {
|
||||
fs.mkdirs(peerRemoteWALDir);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue