From 44ca13fe07dc5050a2bc98ccd3f65953f06aaef8 Mon Sep 17 00:00:00 2001 From: Guanghao Zhang Date: Thu, 31 May 2018 20:54:59 +0800 Subject: [PATCH] HBASE-20569 NPE in RecoverStandbyProcedure.execute --- .../src/main/protobuf/MasterProcedure.proto | 28 ++- .../apache/hadoop/hbase/master/HMaster.java | 10 +- .../hadoop/hbase/master/MasterServices.java | 6 +- .../procedure/MasterProcedureScheduler.java | 3 +- .../procedure/PeerProcedureInterface.java | 2 +- .../hbase/master/procedure/PeerQueue.java | 3 +- .../replication/RecoverStandbyProcedure.java | 68 ++++++-- .../replication/RemovePeerProcedure.java | 5 +- ...a => SyncReplicationReplayWALManager.java} | 105 ++++++++--- .../SyncReplicationReplayWALProcedure.java | 164 ++++++++++++++++++ ...cReplicationReplayWALRemoteProcedure.java} | 113 +++++++----- ...nsitPeerSyncReplicationStateProcedure.java | 6 +- ...SyncReplicationReplayWALWorkerStorage.java | 108 ++++++++++++ .../ReplaySyncReplicationWALCallable.java | 46 +++-- .../hbase/master/MockNoopMasterServices.java | 4 +- .../replication/SyncReplicationTestBase.java | 6 +- .../TestSyncReplicationStandbyKillMaster.java | 88 ++++++++++ .../TestSyncReplicationStandbyKillRS.java | 119 +++++++++++++ .../master/TestRecoverStandbyProcedure.java | 10 +- 19 files changed, 752 insertions(+), 142 deletions(-) rename hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/{ReplaySyncReplicationWALManager.java => SyncReplicationReplayWALManager.java} (63%) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALProcedure.java rename hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/{ReplaySyncReplicationWALProcedure.java => SyncReplicationReplayWALRemoteProcedure.java} (61%) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ZKSyncReplicationReplayWALWorkerStorage.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandbyKillMaster.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandbyKillRS.java diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto index 23ec8f8d6cb..a062e9a8193 100644 --- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto @@ -486,22 +486,34 @@ message TransitPeerSyncReplicationStateStateData { enum RecoverStandbyState { RENAME_SYNC_REPLICATION_WALS_DIR = 1; - INIT_WORKERS = 2; - DISPATCH_TASKS = 3; - SNAPSHOT_SYNC_REPLICATION_WALS_DIR = 4; + REGISTER_PEER_TO_WORKER_STORAGE = 2; + DISPATCH_WALS = 3; + UNREGISTER_PEER_FROM_WORKER_STORAGE = 4; + SNAPSHOT_SYNC_REPLICATION_WALS_DIR = 5; +} + +enum SyncReplicationReplayWALState { + ASSIGN_WORKER = 1; + DISPATCH_WALS_TO_WORKER = 2; + RELEASE_WORKER = 3; } message RecoverStandbyStateData { - required string peer_id = 1; + required bool serial = 1; } -message ReplaySyncReplicationWALStateData { +message SyncReplicationReplayWALStateData { required string peer_id = 1; - required string wal = 2; - optional ServerName target_server = 3; + repeated string wal = 2; +} + +message SyncReplicationReplayWALRemoteStateData { + required string peer_id = 1; + repeated string wal = 2; + required ServerName target_server = 3; } message ReplaySyncReplicationWALParameter { required string peer_id = 1; - required string wal = 2; + repeated string wal = 2; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 2c23e859e96..dc627523569 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -138,8 +138,8 @@ import org.apache.hadoop.hbase.master.replication.AddPeerProcedure; import org.apache.hadoop.hbase.master.replication.DisablePeerProcedure; import org.apache.hadoop.hbase.master.replication.EnablePeerProcedure; import org.apache.hadoop.hbase.master.replication.RemovePeerProcedure; -import org.apache.hadoop.hbase.master.replication.ReplaySyncReplicationWALManager; import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; +import org.apache.hadoop.hbase.master.replication.SyncReplicationReplayWALManager; import org.apache.hadoop.hbase.master.replication.TransitPeerSyncReplicationStateProcedure; import org.apache.hadoop.hbase.master.replication.UpdatePeerConfigProcedure; import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; @@ -343,7 +343,7 @@ public class HMaster extends HRegionServer implements MasterServices { // manager of replication private ReplicationPeerManager replicationPeerManager; - private ReplaySyncReplicationWALManager replaySyncReplicationWALManager; + private SyncReplicationReplayWALManager syncReplicationReplayWALManager; // buffer for "fatal error" notices from region servers // in the cluster. This is only used for assisting @@ -754,6 +754,7 @@ public class HMaster extends HRegionServer implements MasterServices { this.splitOrMergeTracker.start(); this.replicationPeerManager = ReplicationPeerManager.create(zooKeeper, conf); + this.syncReplicationReplayWALManager = new SyncReplicationReplayWALManager(this); this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this, this.serverManager); this.drainingServerTracker.start(); @@ -852,7 +853,6 @@ public class HMaster extends HRegionServer implements MasterServices { initializeMemStoreChunkCreator(); this.fileSystemManager = new MasterFileSystem(conf); this.walManager = new MasterWalManager(this); - this.replaySyncReplicationWALManager = new ReplaySyncReplicationWALManager(this); // enable table descriptors cache this.tableDescriptors.setCacheOn(); @@ -3764,7 +3764,7 @@ public class HMaster extends HRegionServer implements MasterServices { } @Override - public ReplaySyncReplicationWALManager getReplaySyncReplicationWALManager() { - return this.replaySyncReplicationWALManager; + public SyncReplicationReplayWALManager getSyncReplicationReplayWALManager() { + return this.syncReplicationReplayWALManager; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java index 6034ff7284f..7b0c56a924e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java @@ -38,8 +38,8 @@ import org.apache.hadoop.hbase.master.assignment.AssignmentManager; import org.apache.hadoop.hbase.master.locking.LockManager; import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; -import org.apache.hadoop.hbase.master.replication.ReplaySyncReplicationWALManager; import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; +import org.apache.hadoop.hbase.master.replication.SyncReplicationReplayWALManager; import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost; import org.apache.hadoop.hbase.procedure2.LockedResource; @@ -462,9 +462,9 @@ public interface MasterServices extends Server { ReplicationPeerManager getReplicationPeerManager(); /** - * Returns the {@link ReplaySyncReplicationWALManager}. + * Returns the {@link SyncReplicationReplayWALManager}. */ - ReplaySyncReplicationWALManager getReplaySyncReplicationWALManager(); + SyncReplicationReplayWALManager getSyncReplicationReplayWALManager(); /** * Update the peerConfig for the specified peer diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java index 14209867df3..8a28b84b816 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java @@ -207,7 +207,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { // check if the next procedure is still a child. // if not, remove the rq from the fairq and go back to the xlock state Procedure nextProc = rq.peek(); - if (nextProc != null && !Procedure.haveSameParent(nextProc, pollResult)) { + if (nextProc != null && !Procedure.haveSameParent(nextProc, pollResult) + && nextProc.getRootProcId() != pollResult.getRootProcId()) { removeFromRunQueue(fairq, rq); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerProcedureInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerProcedureInterface.java index 76b5163d307..0195ab9c66a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerProcedureInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerProcedureInterface.java @@ -24,7 +24,7 @@ public interface PeerProcedureInterface { enum PeerOperationType { ADD, REMOVE, ENABLE, DISABLE, UPDATE_CONFIG, REFRESH, TRANSIT_SYNC_REPLICATION_STATE, - RECOVER_STANDBY, REPLAY_SYNC_REPLICATION_WAL + RECOVER_STANDBY, SYNC_REPLICATION_REPLAY_WAL, SYNC_REPLICATION_REPLAY_WAL_REMOTE } String getPeerId(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerQueue.java index 25feb7e3fa0..86d8e4341ab 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerQueue.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerQueue.java @@ -50,6 +50,7 @@ class PeerQueue extends Queue { private static boolean requirePeerExclusiveLock(PeerProcedureInterface proc) { return proc.getPeerOperationType() != PeerOperationType.REFRESH - && proc.getPeerOperationType() != PeerOperationType.REPLAY_SYNC_REPLICATION_WAL; + && proc.getPeerOperationType() != PeerOperationType.SYNC_REPLICATION_REPLAY_WAL + && proc.getPeerOperationType() != PeerOperationType.SYNC_REPLICATION_REPLAY_WAL_REMOTE; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java index 9860774970d..54947429d8b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java @@ -18,60 +18,79 @@ package org.apache.hadoop.hbase.master.replication; import java.io.IOException; +import java.util.Arrays; import java.util.List; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RecoverStandbyState; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RecoverStandbyStateData; @InterfaceAudience.Private public class RecoverStandbyProcedure extends AbstractPeerProcedure { private static final Logger LOG = LoggerFactory.getLogger(RecoverStandbyProcedure.class); + private boolean serial; + public RecoverStandbyProcedure() { } - public RecoverStandbyProcedure(String peerId) { + public RecoverStandbyProcedure(String peerId, boolean serial) { super(peerId); + this.serial = serial; } @Override protected Flow executeFromState(MasterProcedureEnv env, RecoverStandbyState state) throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { - ReplaySyncReplicationWALManager replaySyncReplicationWALManager = - env.getMasterServices().getReplaySyncReplicationWALManager(); + SyncReplicationReplayWALManager syncReplicationReplayWALManager = + env.getMasterServices().getSyncReplicationReplayWALManager(); switch (state) { case RENAME_SYNC_REPLICATION_WALS_DIR: try { - replaySyncReplicationWALManager.renameToPeerReplayWALDir(peerId); + syncReplicationReplayWALManager.renameToPeerReplayWALDir(peerId); } catch (IOException e) { LOG.warn("Failed to rename remote wal dir for peer id={}", peerId, e); setFailure("master-recover-standby", e); return Flow.NO_MORE_STATE; } - setNextState(RecoverStandbyState.INIT_WORKERS); + setNextState(RecoverStandbyState.REGISTER_PEER_TO_WORKER_STORAGE); return Flow.HAS_MORE_STATE; - case INIT_WORKERS: - replaySyncReplicationWALManager.initPeerWorkers(peerId); - setNextState(RecoverStandbyState.DISPATCH_TASKS); + case REGISTER_PEER_TO_WORKER_STORAGE: + try { + syncReplicationReplayWALManager.registerPeer(peerId); + } catch (ReplicationException e) { + LOG.warn("Failed to register peer to worker storage for peer id={}, retry", peerId, e); + throw new ProcedureYieldException(); + } + setNextState(RecoverStandbyState.DISPATCH_WALS); return Flow.HAS_MORE_STATE; - case DISPATCH_TASKS: - addChildProcedure(getReplayWALs(replaySyncReplicationWALManager).stream() - .map(wal -> new ReplaySyncReplicationWALProcedure(peerId, - replaySyncReplicationWALManager.removeWALRootPath(wal))) - .toArray(ReplaySyncReplicationWALProcedure[]::new)); + case DISPATCH_WALS: + dispathWals(syncReplicationReplayWALManager); + setNextState(RecoverStandbyState.UNREGISTER_PEER_FROM_WORKER_STORAGE); + return Flow.HAS_MORE_STATE; + case UNREGISTER_PEER_FROM_WORKER_STORAGE: + try { + syncReplicationReplayWALManager.unregisterPeer(peerId); + } catch (ReplicationException e) { + LOG.warn("Failed to unregister peer from worker storage for peer id={}, retry", peerId, + e); + throw new ProcedureYieldException(); + } setNextState(RecoverStandbyState.SNAPSHOT_SYNC_REPLICATION_WALS_DIR); return Flow.HAS_MORE_STATE; case SNAPSHOT_SYNC_REPLICATION_WALS_DIR: try { - replaySyncReplicationWALManager.renameToPeerSnapshotWALDir(peerId); + syncReplicationReplayWALManager.renameToPeerSnapshotWALDir(peerId); } catch (IOException e) { LOG.warn("Failed to cleanup replay wals dir for peer id={}, , retry", peerId, e); throw new ProcedureYieldException(); @@ -82,10 +101,14 @@ public class RecoverStandbyProcedure extends AbstractPeerProcedure getReplayWALs(ReplaySyncReplicationWALManager replaySyncReplicationWALManager) + // TODO: dispatch wals by region server when serial is true and sort wals + private void dispathWals(SyncReplicationReplayWALManager syncReplicationReplayWALManager) throws ProcedureYieldException { try { - return replaySyncReplicationWALManager.getReplayWALsAndCleanUpUnusedFiles(peerId); + List wals = syncReplicationReplayWALManager.getReplayWALsAndCleanUpUnusedFiles(peerId); + addChildProcedure(wals.stream().map(wal -> new SyncReplicationReplayWALProcedure(peerId, + Arrays.asList(syncReplicationReplayWALManager.removeWALRootPath(wal)))) + .toArray(SyncReplicationReplayWALProcedure[]::new)); } catch (IOException e) { LOG.warn("Failed to get replay wals for peer id={}, , retry", peerId, e); throw new ProcedureYieldException(); @@ -111,4 +134,17 @@ public class RecoverStandbyProcedure extends AbstractPeerProcedure> availServers = new HashMap<>(); + private final ZKSyncReplicationReplayWALWorkerStorage workerStorage; - public ReplaySyncReplicationWALManager(MasterServices services) { + private final Map> workers = new HashMap<>(); + + private final Object workerLock = new Object(); + + public SyncReplicationReplayWALManager(MasterServices services) + throws IOException, ReplicationException { this.services = services; this.fs = services.getMasterFileSystem().getWALFileSystem(); this.walRootDir = services.getMasterFileSystem().getWALRootDir(); this.remoteWALDir = new Path(this.walRootDir, ReplicationUtils.REMOTE_WAL_DIR_NAME); + this.workerStorage = new ZKSyncReplicationReplayWALWorkerStorage(services.getZooKeeper(), + services.getConfiguration()); + checkReplayingWALDir(); } + private void checkReplayingWALDir() throws IOException, ReplicationException { + FileStatus[] files = fs.listStatus(remoteWALDir); + for (FileStatus file : files) { + String name = file.getPath().getName(); + if (name.endsWith(REMOTE_WAL_REPLAY_SUFFIX)) { + String peerId = name.substring(0, name.length() - REMOTE_WAL_REPLAY_SUFFIX.length()); + workers.put(peerId, workerStorage.getPeerWorkers(peerId)); + } + } + } + + public void registerPeer(String peerId) throws ReplicationException { + workers.put(peerId, new HashSet<>()); + workerStorage.addPeer(peerId); + } + + public void unregisterPeer(String peerId) throws ReplicationException { + workers.remove(peerId); + workerStorage.removePeer(peerId); + } + + public ServerName getPeerWorker(String peerId) throws ReplicationException { + Optional worker = Optional.empty(); + ServerName workerServer = null; + synchronized (workerLock) { + worker = services.getServerManager().getOnlineServers().keySet().stream() + .filter(server -> !workers.get(peerId).contains(server)).findFirst(); + if (worker.isPresent()) { + workerServer = worker.get(); + workers.get(peerId).add(workerServer); + } + } + if (workerServer != null) { + workerStorage.addPeerWorker(peerId, workerServer); + } + return workerServer; + } + + public void removePeerWorker(String peerId, ServerName worker) throws ReplicationException { + synchronized (workerLock) { + workers.get(peerId).remove(worker); + } + workerStorage.removePeerWorker(peerId, worker); + } public void createPeerRemoteWALDir(String peerId) throws IOException { Path peerRemoteWALDir = getPeerRemoteWALDir(remoteWALDir, peerId); if (!fs.exists(peerRemoteWALDir) && !fs.mkdirs(peerRemoteWALDir)) { @@ -77,23 +132,23 @@ public class ReplaySyncReplicationWALManager { deleteDir(dst, peerId); if (!fs.rename(src, dst)) { throw new IOException( - "Failed to rename dir from " + src + " to " + dst + " for peer id=" + peerId); + "Failed to rename dir from " + src + " to " + dst + " for peer id=" + peerId); } LOG.info("Renamed dir from {} to {} for peer id={}", src, dst, peerId); } else if (!fs.exists(dst)) { throw new IOException( - "Want to rename from " + src + " to " + dst + ", but they both do not exist"); + "Want to rename from " + src + " to " + dst + ", but they both do not exist"); } } public void renameToPeerReplayWALDir(String peerId) throws IOException { rename(getPeerRemoteWALDir(remoteWALDir, peerId), getPeerReplayWALDir(remoteWALDir, peerId), - peerId); + peerId); } public void renameToPeerSnapshotWALDir(String peerId) throws IOException { rename(getPeerReplayWALDir(remoteWALDir, peerId), getPeerSnapshotWALDir(remoteWALDir, peerId), - peerId); + peerId); } public List getReplayWALsAndCleanUpUnusedFiles(String peerId) throws IOException { @@ -103,7 +158,7 @@ public class ReplaySyncReplicationWALManager { Path src = status.getPath(); String srcName = src.getName(); String dstName = - srcName.substring(0, srcName.length() - ReplicationUtils.RENAME_WAL_SUFFIX.length()); + srcName.substring(0, srcName.length() - ReplicationUtils.RENAME_WAL_SUFFIX.length()); FSUtils.renameFile(fs, src, new Path(src.getParent(), dstName)); } List wals = new ArrayList<>(); @@ -140,28 +195,22 @@ public class ReplaySyncReplicationWALManager { deleteDir(getPeerSnapshotWALDir(remoteWALDir, peerId), peerId); } - public void initPeerWorkers(String peerId) { - BlockingQueue servers = new LinkedBlockingQueue<>(); - services.getServerManager().getOnlineServers().keySet() - .forEach(server -> servers.offer(server)); - availServers.put(peerId, servers); - } - - public ServerName getAvailServer(String peerId, long timeout, TimeUnit unit) - throws InterruptedException { - return availServers.get(peerId).poll(timeout, unit); - } - - public void addAvailServer(String peerId, ServerName server) { - availServers.get(peerId).offer(server); - } - public String removeWALRootPath(Path path) { String pathStr = path.toString(); // remove the "/" too. return pathStr.substring(walRootDir.toString().length() + 1); } + public void finishReplayWAL(String wal) throws IOException { + Path walPath = new Path(walRootDir, wal); + fs.truncate(walPath, 0); + } + + public boolean isReplayWALFinished(String wal) throws IOException { + Path walPath = new Path(walRootDir, wal); + return fs.getFileStatus(walPath).getLen() == 0; + } + @VisibleForTesting public Path getRemoteWALDir() { return remoteWALDir; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALProcedure.java new file mode 100644 index 00000000000..26d6a3f78b4 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALProcedure.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.replication; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SyncReplicationReplayWALState; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SyncReplicationReplayWALStateData; + +@InterfaceAudience.Private +public class SyncReplicationReplayWALProcedure + extends StateMachineProcedure + implements PeerProcedureInterface { + + private static final Logger LOG = + LoggerFactory.getLogger(SyncReplicationReplayWALProcedure.class); + + private String peerId; + + private ServerName worker = null; + + private List wals; + + public SyncReplicationReplayWALProcedure() { + } + + public SyncReplicationReplayWALProcedure(String peerId, List wals) { + this.peerId = peerId; + this.wals = wals; + } + + @Override protected Flow executeFromState(MasterProcedureEnv env, + SyncReplicationReplayWALState state) + throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { + SyncReplicationReplayWALManager syncReplicationReplayWALManager = + env.getMasterServices().getSyncReplicationReplayWALManager(); + switch (state) { + case ASSIGN_WORKER: + try { + worker = syncReplicationReplayWALManager.getPeerWorker(peerId); + } catch (ReplicationException e) { + LOG.info("Failed to get worker to replay wals {} for peer id={}, retry", wals, peerId); + throw new ProcedureYieldException(); + } + if (worker == null) { + LOG.info("No worker to replay wals {} for peer id={}, retry", wals, peerId); + setNextState(SyncReplicationReplayWALState.ASSIGN_WORKER); + } else { + setNextState(SyncReplicationReplayWALState.DISPATCH_WALS_TO_WORKER); + } + return Flow.HAS_MORE_STATE; + case DISPATCH_WALS_TO_WORKER: + addChildProcedure(new SyncReplicationReplayWALRemoteProcedure(peerId, wals, worker)); + setNextState(SyncReplicationReplayWALState.RELEASE_WORKER); + return Flow.HAS_MORE_STATE; + case RELEASE_WORKER: + boolean finished = false; + try { + finished = syncReplicationReplayWALManager.isReplayWALFinished(wals.get(0)); + } catch (IOException e) { + LOG.info("Failed to check whether replay wals {} finished for peer id={}", wals, peerId); + throw new ProcedureYieldException(); + } + try { + syncReplicationReplayWALManager.removePeerWorker(peerId, worker); + } catch (ReplicationException e) { + LOG.info("Failed to remove worker for peer id={}, retry", peerId); + throw new ProcedureYieldException(); + } + if (!finished) { + LOG.info("Failed to replay wals {} for peer id={}, retry", wals, peerId); + setNextState(SyncReplicationReplayWALState.ASSIGN_WORKER); + return Flow.HAS_MORE_STATE; + } + return Flow.NO_MORE_STATE; + default: + throw new UnsupportedOperationException("unhandled state=" + state); + } + } + + @Override + protected void rollbackState(MasterProcedureEnv env, + SyncReplicationReplayWALState state) + throws IOException, InterruptedException { + if (state == getInitialState()) { + return; + } + throw new UnsupportedOperationException(); + } + + @Override + protected SyncReplicationReplayWALState getState(int state) { + return SyncReplicationReplayWALState.forNumber(state); + } + + @Override + protected int getStateId( + SyncReplicationReplayWALState state) { + return state.getNumber(); + } + + @Override + protected SyncReplicationReplayWALState getInitialState() { + return SyncReplicationReplayWALState.ASSIGN_WORKER; + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) + throws IOException { + SyncReplicationReplayWALStateData.Builder builder = + SyncReplicationReplayWALStateData.newBuilder(); + builder.setPeerId(peerId); + wals.stream().forEach(builder::addWal); + serializer.serialize(builder.build()); + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + SyncReplicationReplayWALStateData data = + serializer.deserialize(SyncReplicationReplayWALStateData.class); + peerId = data.getPeerId(); + wals = new ArrayList<>(); + data.getWalList().forEach(wals::add); + } + + @Override + public String getPeerId() { + return peerId; + } + + @Override + public PeerOperationType getPeerOperationType() { + return PeerOperationType.SYNC_REPLICATION_REPLAY_WAL; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALRemoteProcedure.java similarity index 61% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALProcedure.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALRemoteProcedure.java index 77fd24dabaf..9f4f33088ad 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALRemoteProcedure.java @@ -18,7 +18,8 @@ package org.apache.hadoop.hbase.master.replication; import java.io.IOException; -import java.util.concurrent.TimeUnit; +import java.util.ArrayList; +import java.util.List; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; @@ -40,22 +41,20 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReplaySyncReplicationWALParameter; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReplaySyncReplicationWALStateData; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SyncReplicationReplayWALRemoteStateData; @InterfaceAudience.Private -public class ReplaySyncReplicationWALProcedure extends Procedure +public class SyncReplicationReplayWALRemoteProcedure extends Procedure implements RemoteProcedure, PeerProcedureInterface { private static final Logger LOG = - LoggerFactory.getLogger(ReplaySyncReplicationWALProcedure.class); - - private static final long DEFAULT_WAIT_AVAILABLE_SERVER_TIMEOUT = 10000; + LoggerFactory.getLogger(SyncReplicationReplayWALRemoteProcedure.class); private String peerId; - private ServerName targetServer = null; + private ServerName targetServer; - private String wal; + private List wals; private boolean dispatched; @@ -63,19 +62,24 @@ public class ReplaySyncReplicationWALProcedure extends Procedure wals, + ServerName targetServer) { this.peerId = peerId; - this.wal = wal; + this.wals = wals; + this.targetServer = targetServer; } @Override public RemoteOperation remoteCallBuild(MasterProcedureEnv env, ServerName remote) { + ReplaySyncReplicationWALParameter.Builder builder = + ReplaySyncReplicationWALParameter.newBuilder(); + builder.setPeerId(peerId); + wals.stream().forEach(builder::addWal); return new ServerOperation(this, getProcId(), ReplaySyncReplicationWALCallable.class, - ReplaySyncReplicationWALParameter.newBuilder().setPeerId(peerId).setWal(wal).build() - .toByteArray()); + builder.build().toByteArray()); } @Override @@ -99,22 +103,47 @@ public class ReplaySyncReplicationWALProcedure extends Procedure[] execute(MasterProcedureEnv env) throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { @@ -126,25 +155,14 @@ public class ReplaySyncReplicationWALProcedure extends Procedure(); + data.getWalList().forEach(wals::add); + targetServer = ProtobufUtil.toServerName(data.getTargetServer()); } @Override @@ -191,6 +208,6 @@ public class ReplaySyncReplicationWALProcedure extends Procedure entries = readWALEntries(reader); - while (!entries.isEmpty()) { - Pair pair = ReplicationProtbufUtil - .buildReplicateWALEntryRequest(entries.toArray(new Entry[entries.size()])); - ReplicateWALEntryRequest request = pair.getFirst(); - rs.getReplicationSinkService().replicateLogEntries(request.getEntryList(), - pair.getSecond(), request.getReplicationClusterId(), - request.getSourceBaseNamespaceDirPath(), request.getSourceHFileArchiveDirPath()); - // Read next entries. - entries = readWALEntries(reader); + Lock peerLock = peersLock.acquireLock(wals.get(0)); + try { + for (String wal : wals) { + replayWAL(wal); } + } finally { + peerLock.unlock(); } } return null; @@ -107,7 +107,7 @@ public class ReplaySyncReplicationWALCallable implements RSProcedureCallable { ReplaySyncReplicationWALParameter param = ReplaySyncReplicationWALParameter.parseFrom(parameter); this.peerId = param.getPeerId(); - this.wal = param.getWal(); + param.getWalList().forEach(this.wals::add); this.batchSize = rs.getConfiguration().getLong(REPLAY_SYNC_REPLICATION_WAL_BATCH_SIZE, DEFAULT_REPLAY_SYNC_REPLICATION_WAL_BATCH_SIZE); } catch (InvalidProtocolBufferException e) { @@ -120,7 +120,23 @@ public class ReplaySyncReplicationWALCallable implements RSProcedureCallable { return EventType.RS_REPLAY_SYNC_REPLICATION_WAL; } - private Reader getReader() throws IOException { + private void replayWAL(String wal) throws IOException { + try (Reader reader = getReader(wal)) { + List entries = readWALEntries(reader); + while (!entries.isEmpty()) { + Pair pair = ReplicationProtbufUtil + .buildReplicateWALEntryRequest(entries.toArray(new Entry[entries.size()])); + ReplicateWALEntryRequest request = pair.getFirst(); + rs.getReplicationSinkService().replicateLogEntries(request.getEntryList(), + pair.getSecond(), request.getReplicationClusterId(), + request.getSourceBaseNamespaceDirPath(), request.getSourceHFileArchiveDirPath()); + // Read next entries. + entries = readWALEntries(reader); + } + } + } + + private Reader getReader(String wal) throws IOException { Path path = new Path(rs.getWALRootDir(), wal); long length = rs.getWALFileSystem().getFileStatus(path).getLen(); try { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java index 48d47ea610e..ac20dbd0e2b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java @@ -41,8 +41,8 @@ import org.apache.hadoop.hbase.master.assignment.AssignmentManager; import org.apache.hadoop.hbase.master.locking.LockManager; import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; -import org.apache.hadoop.hbase.master.replication.ReplaySyncReplicationWALManager; import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; +import org.apache.hadoop.hbase.master.replication.SyncReplicationReplayWALManager; import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost; import org.apache.hadoop.hbase.procedure2.LockedResource; @@ -476,7 +476,7 @@ public class MockNoopMasterServices implements MasterServices { } @Override - public ReplaySyncReplicationWALManager getReplaySyncReplicationWALManager() { + public SyncReplicationReplayWALManager getSyncReplicationReplayWALManager() { return null; } } \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java index a20edd3b264..f765139099a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java @@ -78,7 +78,7 @@ public class SyncReplicationTestBase { protected static Path REMOTE_WAL_DIR2; - private static void initTestingUtility(HBaseTestingUtility util, String zkParent) { + protected static void initTestingUtility(HBaseTestingUtility util, String zkParent) { util.setZkCluster(ZK_UTIL.getZkCluster()); Configuration conf = util.getConfiguration(); conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, zkParent); @@ -102,8 +102,8 @@ public class SyncReplicationTestBase { ZK_UTIL.startMiniZKCluster(); initTestingUtility(UTIL1, "/cluster1"); initTestingUtility(UTIL2, "/cluster2"); - UTIL1.startMiniCluster(3); - UTIL2.startMiniCluster(3); + UTIL1.startMiniCluster(2,3); + UTIL2.startMiniCluster(2,3); TableDescriptor td = TableDescriptorBuilder.newBuilder(TABLE_NAME).setColumnFamily(ColumnFamilyDescriptorBuilder .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandbyKillMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandbyKillMaster.java new file mode 100644 index 00000000000..6265f5cce7c --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandbyKillMaster.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.master.MasterFileSystem; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Category({ ReplicationTests.class, LargeTests.class }) +public class TestSyncReplicationStandbyKillMaster extends SyncReplicationTestBase { + + private static final Logger LOG = + LoggerFactory.getLogger(TestSyncReplicationStandbyKillMaster.class); + + private final long SLEEP_TIME = 2000; + + private final int COUNT = 1000; + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestSyncReplicationStandbyKillMaster.class); + + @Test + public void testStandbyKillMaster() throws Exception { + MasterFileSystem mfs = UTIL2.getHBaseCluster().getMaster().getMasterFileSystem(); + Path remoteWALDir = getRemoteWALDir(mfs, PEER_ID); + assertFalse(mfs.getWALFileSystem().exists(remoteWALDir)); + UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.STANDBY); + assertTrue(mfs.getWALFileSystem().exists(remoteWALDir)); + UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.ACTIVE); + + // Disable async replication and write data, then shutdown + UTIL1.getAdmin().disableReplicationPeer(PEER_ID); + write(UTIL1, 0, COUNT); + UTIL1.shutdownMiniCluster(); + + Thread t = new Thread(() -> { + try { + Thread.sleep(SLEEP_TIME); + UTIL2.getMiniHBaseCluster().getMaster().stop("Stop master for test"); + } catch (Exception e) { + LOG.error("Failed to stop master", e); + } + }); + t.start(); + + // Transit standby to DA to replay logs + try { + UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.DOWNGRADE_ACTIVE); + } catch (Exception e) { + LOG.error("Failed to transit standby cluster to " + SyncReplicationState.DOWNGRADE_ACTIVE); + } + + while (UTIL2.getAdmin().getReplicationPeerSyncReplicationState(PEER_ID) + != SyncReplicationState.DOWNGRADE_ACTIVE) { + Thread.sleep(SLEEP_TIME); + } + verify(UTIL2, 0, COUNT); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandbyKillRS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandbyKillRS.java new file mode 100644 index 00000000000..3c9724f260e --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandbyKillRS.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.List; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.master.MasterFileSystem; +import org.apache.hadoop.hbase.master.ServerManager; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Category({ ReplicationTests.class, LargeTests.class }) +public class TestSyncReplicationStandbyKillRS extends SyncReplicationTestBase { + + private static final Logger LOG = + LoggerFactory.getLogger(TestSyncReplicationStandbyKillRS.class); + + private final long SLEEP_TIME = 1000; + + private final int COUNT = 1000; + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestSyncReplicationStandbyKillRS.class); + + @Test + public void testStandbyKillRegionServer() throws Exception { + MasterFileSystem mfs = UTIL2.getHBaseCluster().getMaster().getMasterFileSystem(); + Path remoteWALDir = getRemoteWALDir(mfs, PEER_ID); + assertFalse(mfs.getWALFileSystem().exists(remoteWALDir)); + UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.STANDBY); + assertTrue(mfs.getWALFileSystem().exists(remoteWALDir)); + UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.ACTIVE); + + // Disable async replication and write data, then shutdown + UTIL1.getAdmin().disableReplicationPeer(PEER_ID); + write(UTIL1, 0, COUNT); + UTIL1.shutdownMiniCluster(); + + JVMClusterUtil.MasterThread activeMaster = UTIL2.getMiniHBaseCluster().getMasterThread(); + Thread t = new Thread(() -> { + try { + List regionServers = + UTIL2.getMiniHBaseCluster().getLiveRegionServerThreads(); + for (JVMClusterUtil.RegionServerThread rst : regionServers) { + ServerName serverName = rst.getRegionServer().getServerName(); + rst.getRegionServer().stop("Stop RS for test"); + waitForRSShutdownToStartAndFinish(activeMaster, serverName); + JVMClusterUtil.RegionServerThread restarted = + UTIL2.getMiniHBaseCluster().startRegionServer(); + restarted.waitForServerOnline(); + } + } catch (Exception e) { + LOG.error("Failed to kill RS", e); + } + }); + t.start(); + + // Transit standby to DA to replay logs + try { + UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.DOWNGRADE_ACTIVE); + } catch (Exception e) { + LOG.error("Failed to transit standby cluster to " + SyncReplicationState.DOWNGRADE_ACTIVE); + } + + while (UTIL2.getAdmin().getReplicationPeerSyncReplicationState(PEER_ID) + != SyncReplicationState.DOWNGRADE_ACTIVE) { + Thread.sleep(SLEEP_TIME); + } + verify(UTIL2, 0, COUNT); + } + + private void waitForRSShutdownToStartAndFinish(JVMClusterUtil.MasterThread activeMaster, + ServerName serverName) throws InterruptedException { + ServerManager sm = activeMaster.getMaster().getServerManager(); + // First wait for it to be in dead list + while (!sm.getDeadServers().isDeadServer(serverName)) { + LOG.debug("Waiting for [" + serverName + "] to be listed as dead in master"); + Thread.sleep(SLEEP_TIME); + } + LOG.debug("Server [" + serverName + "] marked as dead, waiting for it to " + + "finish dead processing"); + while (sm.areDeadServersInProgress()) { + LOG.debug("Server [" + serverName + "] still being processed, waiting"); + Thread.sleep(SLEEP_TIME); + } + LOG.debug("Server [" + serverName + "] done with server shutdown processing"); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java index 2563669bdfd..d01a0ac61ad 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java @@ -40,7 +40,7 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.replication.RecoverStandbyProcedure; -import org.apache.hadoop.hbase.master.replication.ReplaySyncReplicationWALManager; +import org.apache.hadoop.hbase.master.replication.SyncReplicationReplayWALManager; import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter; @@ -92,7 +92,7 @@ public class TestRecoverStandbyProcedure { private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); - private static ReplaySyncReplicationWALManager replaySyncReplicationWALManager; + private static SyncReplicationReplayWALManager syncReplicationReplayWALManager; private static ProcedureExecutor procExec; @@ -107,7 +107,7 @@ public class TestRecoverStandbyProcedure { conf = UTIL.getConfiguration(); HMaster master = UTIL.getHBaseCluster().getMaster(); fs = master.getMasterFileSystem().getWALFileSystem(); - replaySyncReplicationWALManager = master.getReplaySyncReplicationWALManager(); + syncReplicationReplayWALManager = master.getSyncReplicationReplayWALManager(); procExec = master.getMasterProcedureExecutor(); } @@ -138,7 +138,7 @@ public class TestRecoverStandbyProcedure { @Test public void testRecoverStandby() throws IOException, StreamLacksCapabilityException { setupSyncReplicationWALs(); - long procId = procExec.submitProcedure(new RecoverStandbyProcedure(PEER_ID)); + long procId = procExec.submitProcedure(new RecoverStandbyProcedure(PEER_ID, false)); ProcedureTestingUtility.waitProcedure(procExec, procId); ProcedureTestingUtility.assertProcNotFailed(procExec, procId); @@ -153,7 +153,7 @@ public class TestRecoverStandbyProcedure { private void setupSyncReplicationWALs() throws IOException, StreamLacksCapabilityException { Path peerRemoteWALDir = ReplicationUtils - .getPeerRemoteWALDir(replaySyncReplicationWALManager.getRemoteWALDir(), PEER_ID); + .getPeerRemoteWALDir(syncReplicationReplayWALManager.getRemoteWALDir(), PEER_ID); if (!fs.exists(peerRemoteWALDir)) { fs.mkdirs(peerRemoteWALDir); }