diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index a7a06f9a4dc..c123faf4b07 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -1193,11 +1193,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
private void initializeWarmup(final CancelableProgressable reporter) throws IOException {
MonitoredTask status = TaskMonitor.get().createStatus("Initializing region " + this);
// Initialize all the HStores
- status.setStatus("Warming up all the Stores");
+ status.setStatus("Warmup all stores of " + this.getRegionInfo().getRegionNameAsString());
try {
initializeStores(reporter, status, true);
} finally {
- status.markComplete("Done warming up.");
+ status.markComplete("Warmed up " + this.getRegionInfo().getRegionNameAsString());
}
}
@@ -8130,15 +8130,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
final CancelableProgressable reporter)
throws IOException {
- if (info == null) throw new NullPointerException("Passed region info is null");
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("HRegion.Warming up region: " + info);
- }
-
+ Objects.requireNonNull(info, "RegionInfo cannot be null");
+ LOG.debug("Warmup {}", info);
Path rootDir = CommonFSUtils.getRootDir(conf);
Path tableDir = CommonFSUtils.getTableDir(rootDir, info.getTable());
-
FileSystem fs = null;
if (rsServices != null) {
fs = rsServices.getFileSystem();
@@ -8146,7 +8141,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (fs == null) {
fs = rootDir.getFileSystem(conf);
}
-
HRegion r = HRegion.newHRegion(tableDir, wal, fs, conf, info, htd, null);
r.initializeWarmup(reporter);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 5836c47a98b..ac79afab9d2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -2062,48 +2062,34 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
/**
- * Wamrmup a region on this server.
- *
- * This method should only be called by Master. It synchrnously opens the region and
+ * Warmup a region on this server.
+ * This method should only be called by Master. It synchronously opens the region and
* closes the region bringing the most important pages in cache.
- *
- *
- * @param controller the RPC controller
- * @param request the request
- * @throws ServiceException
*/
@Override
public WarmupRegionResponse warmupRegion(final RpcController controller,
final WarmupRegionRequest request) throws ServiceException {
-
final RegionInfo region = ProtobufUtil.toRegionInfo(request.getRegionInfo());
- TableDescriptor htd;
WarmupRegionResponse response = WarmupRegionResponse.getDefaultInstance();
-
try {
checkOpen();
String encodedName = region.getEncodedName();
byte[] encodedNameBytes = region.getEncodedNameAsBytes();
final HRegion onlineRegion = regionServer.getRegion(encodedName);
-
if (onlineRegion != null) {
- LOG.info("Region already online. Skipping warming up " + region);
+ LOG.info("{} is online; skipping warmup", region);
return response;
}
-
- htd = regionServer.tableDescriptors.get(region.getTable());
-
+ TableDescriptor htd = regionServer.tableDescriptors.get(region.getTable());
if (regionServer.getRegionsInTransitionInRS().containsKey(encodedNameBytes)) {
- LOG.info("Region is in transition. Skipping warmup " + region);
+ LOG.info("{} is in transition; skipping warmup", region);
return response;
}
-
- LOG.info("Warming up region " + region.getRegionNameAsString());
+ LOG.info("Warmup {}", region.getRegionNameAsString());
HRegion.warmupHRegion(region, htd, regionServer.getWAL(region),
regionServer.getConfiguration(), regionServer, null);
-
} catch (IOException ie) {
- LOG.error("Failed warming up region " + region.getRegionNameAsString(), ie);
+ LOG.error("Failed warmup of {}", region.getRegionNameAsString(), ie);
throw new ServiceException(ie);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/AssignRegionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/AssignRegionHandler.java
index 4ee6efc13dc..5d9819ca56a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/AssignRegionHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/AssignRegionHandler.java
@@ -138,7 +138,8 @@ public class AssignRegionHandler extends EventHandler {
if (ServerRegionReplicaUtil.isMetaRegionReplicaReplicationEnabled(conf, tn)) {
if (RegionReplicaUtil.isDefaultReplica(this.regionInfo.getReplicaId())) {
// Add the hbase:meta replication source on replica zero/default.
- rs.getReplicationSourceService().getReplicationManager().addCatalogReplicationSource();
+ rs.getReplicationSourceService().getReplicationManager().
+ addCatalogReplicationSource(this.regionInfo);
}
}
region = HRegion.openHRegion(regionInfo, htd, rs.getWAL(regionInfo), conf, rs, null);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java
index 1ed74bb86e0..0d02f30e5ab 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java
@@ -126,7 +126,8 @@ public class UnassignRegionHandler extends EventHandler {
if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo().getReplicaId())) {
// If hbase:meta read replicas enabled, remove replication source for hbase:meta Regions.
// See assign region handler where we add the replication source on open.
- rs.getReplicationSourceService().getReplicationManager().removeCatalogReplicationSource();
+ rs.getReplicationSourceService().getReplicationManager().
+ removeCatalogReplicationSource(region.getRegionInfo());
}
}
if (!rs.reportRegionStateTransition(
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSinkServiceImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSinkServiceImpl.java
new file mode 100644
index 00000000000..edd567914dc
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSinkServiceImpl.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSink;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALProvider;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
+
+@InterfaceAudience.Private
+public class ReplicationSinkServiceImpl implements ReplicationSinkService {
+ private static final Logger LOG = LoggerFactory.getLogger(ReplicationSinkServiceImpl.class);
+
+ private Configuration conf;
+
+ private Server server;
+
+ private ReplicationSink replicationSink;
+
+ // ReplicationLoad to access replication metrics
+ private ReplicationLoad replicationLoad;
+
+ private int statsPeriodInSecond;
+
+ @Override
+ public void replicateLogEntries(List entries, CellScanner cells,
+ String replicationClusterId, String sourceBaseNamespaceDirPath,
+ String sourceHFileArchiveDirPath) throws IOException {
+ this.replicationSink.replicateEntries(entries, cells, replicationClusterId,
+ sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath);
+ }
+
+ @Override
+ public void initialize(Server server, FileSystem fs, Path logdir, Path oldLogDir,
+ WALFactory walFactory) throws IOException {
+ this.server = server;
+ this.conf = server.getConfiguration();
+ this.statsPeriodInSecond =
+ this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
+ this.replicationLoad = new ReplicationLoad();
+ }
+
+ @Override
+ public void startReplicationService() throws IOException {
+ this.replicationSink = new ReplicationSink(this.conf);
+ this.server.getChoreService().scheduleChore(
+ new ReplicationStatisticsChore("ReplicationSinkStatistics", server,
+ (int) TimeUnit.SECONDS.toMillis(statsPeriodInSecond)));
+ }
+
+ @Override
+ public void stopReplicationService() {
+ if (this.replicationSink != null) {
+ this.replicationSink.stopReplicationSinkServices();
+ }
+ }
+
+ @Override
+ public ReplicationLoad refreshAndGetReplicationLoad() {
+ if (replicationLoad == null) {
+ return null;
+ }
+ // always build for latest data
+ replicationLoad.buildReplicationLoad(Collections.emptyList(), replicationSink.getSinkMetrics());
+ return replicationLoad;
+ }
+
+ private final class ReplicationStatisticsChore extends ScheduledChore {
+
+ ReplicationStatisticsChore(String name, Stoppable stopper, int period) {
+ super(name, stopper, period);
+ }
+
+ @Override
+ protected void chore() {
+ printStats(replicationSink.getStats());
+ }
+
+ private void printStats(String stats) {
+ if (!stats.isEmpty()) {
+ LOG.info(stats);
+ }
+ }
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
index 7eca732d583..d5ebbe999d2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
@@ -444,11 +444,22 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
try {
locations = RegionReplicaReplayCallable
.getRegionLocations(connection, tableName, row, useCache, 0);
-
if (locations == null) {
throw new HBaseIOException("Cannot locate locations for "
+ tableName + ", row:" + Bytes.toStringBinary(row));
}
+ // Replicas can take a while to come online. The cache may have only the primary. If we
+ // keep going to the cache, we will not learn of the replicas and their locations after
+ // they come online.
+ if (useCache && locations.size() == 1 && TableName.isMetaTableName(tableName)) {
+ if (tableDescriptors.get(tableName).getRegionReplication() > 1) {
+ // Make an obnoxious log here. See how bad this issue is. Add a timer if happening
+ // too much.
+ LOG.info("Skipping location cache; only one location found for {}", tableName);
+ useCache = false;
+ continue;
+ }
+ }
} catch (TableNotFoundException e) {
if (LOG.isTraceEnabled()) {
LOG.trace("Skipping " + entries.size() + " entries because table " + tableName
@@ -467,7 +478,7 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
// entry is not coming from the primary region, filter it out.
HRegionLocation primaryLocation = locations.getDefaultRegionLocation();
if (!Bytes.equals(primaryLocation.getRegionInfo().getEncodedNameAsBytes(),
- encodedRegionName)) {
+ encodedRegionName)) {
if (useCache) {
useCache = false;
continue; // this will retry location lookup
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 93730d411c9..860cbd3cb5e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -50,6 +50,8 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationListener;
@@ -63,6 +65,7 @@ import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.yetus.audience.InterfaceAudience;
@@ -343,7 +346,7 @@ public class ReplicationSourceManager implements ReplicationListener {
/**
* @return a new 'classic' user-space replication source.
* @param queueId the id of the replication queue to associate the ReplicationSource with.
- * @see #createCatalogReplicationSource() for creating a ReplicationSource for hbase:meta.
+ * @see #createCatalogReplicationSource(RegionInfo) for creating a ReplicationSource for meta.
*/
private ReplicationSourceInterface createSource(String queueId, ReplicationPeer replicationPeer)
throws IOException {
@@ -985,57 +988,65 @@ public class ReplicationSourceManager implements ReplicationListener {
/**
* Add an hbase:meta Catalog replication source. Called on open of an hbase:meta Region.
- * @see #removeCatalogReplicationSource()
+ * Create it once only. If exists already, use the existing one.
+ * @see #removeCatalogReplicationSource(RegionInfo)
+ * @see #addSource(String) This is specialization on the addSource method.
*/
- public ReplicationSourceInterface addCatalogReplicationSource() throws IOException {
- // Open/Create the hbase:meta ReplicationSource once only.
+ public ReplicationSourceInterface addCatalogReplicationSource(RegionInfo regionInfo)
+ throws IOException {
+ // Poor-man's putIfAbsent
synchronized (this.catalogReplicationSource) {
ReplicationSourceInterface rs = this.catalogReplicationSource.get();
return rs != null ? rs :
- this.catalogReplicationSource.getAndSet(createCatalogReplicationSource());
+ this.catalogReplicationSource.getAndSet(createCatalogReplicationSource(regionInfo));
}
}
/**
* Remove the hbase:meta Catalog replication source.
* Called when we close hbase:meta.
- * @see #addCatalogReplicationSource()
+ * @see #addCatalogReplicationSource(RegionInfo regionInfo)
*/
- public void removeCatalogReplicationSource() {
+ public void removeCatalogReplicationSource(RegionInfo regionInfo) {
// Nothing to do. Leave any CatalogReplicationSource in place in case an hbase:meta Region
// comes back to this server.
}
/**
* Create, initialize, and start the Catalog ReplicationSource.
+ * Presumes called one-time only (caller must ensure one-time only call).
+ * @see #addSource(String) This is a specialization of the addSource call.
*/
- private ReplicationSourceInterface createCatalogReplicationSource() throws IOException {
- // Has the hbase:meta WALProvider been instantiated?
+ private ReplicationSourceInterface createCatalogReplicationSource(RegionInfo regionInfo)
+ throws IOException {
+ // Instantiate meta walProvider. Instantiated here or over in the #warmupRegion call made by the
+ // Master on a 'move' operation. Need to do extra work if we did NOT instantiate the provider.
WALProvider walProvider = this.walFactory.getMetaWALProvider();
- boolean addListener = false;
- if (walProvider == null) {
- // The meta walProvider has not been instantiated. Create it.
+ boolean instantiate = walProvider == null;
+ if (instantiate) {
walProvider = this.walFactory.getMetaProvider();
- addListener = true;
}
CatalogReplicationSourcePeer peer = new CatalogReplicationSourcePeer(this.conf,
this.clusterId.toString(), "meta_" + ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER);
final ReplicationSourceInterface crs = new CatalogReplicationSource();
crs.init(conf, fs, this, new NoopReplicationQueueStorage(), peer, server, peer.getId(),
clusterId, walProvider.getWALFileLengthProvider(), new MetricsSource(peer.getId()));
- if (addListener) {
- walProvider.addWALActionsListener(new WALActionsListener() {
- @Override
- public void postLogRoll(Path oldPath, Path newPath) throws IOException {
- crs.enqueueLog(newPath);
- }
- });
- } else {
- // This is a problem. We'll have a ReplicationSource but no listener on hbase:meta WALs
- // so nothing will be replicated.
- LOG.error("Did not install WALActionsListener creating CatalogReplicationSource!");
+ // Add listener on the provider so we can pick up the WAL to replicate on roll.
+ WALActionsListener listener = new WALActionsListener() {
+ @Override public void postLogRoll(Path oldPath, Path newPath) throws IOException {
+ crs.enqueueLog(newPath);
+ }
+ };
+ walProvider.addWALActionsListener(listener);
+ if (!instantiate) {
+ // If we did not instantiate provider, need to add our listener on already-created WAL
+ // instance too (listeners are passed by provider to WAL instance on creation but if provider
+ // created already, our listener add above is missed). And add the current WAL file to the
+ // Replication Source so it can start replicating it.
+ WAL wal = walProvider.getWAL(regionInfo);
+ wal.registerWALActionsListener(listener);
+ crs.enqueueLog(((AbstractFSWAL)wal).getCurrentFileName());
}
- // Start this ReplicationSource.
return crs.startup();
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
index 7b8b3a01e32..97476734ef3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
@@ -289,10 +289,10 @@ public class WALFactory {
}
/**
- * @param region the region which we want to get a WAL for it. Could be null.
+ * @param region the region which we want to get a WAL for. Could be null.
*/
public WAL getWAL(RegionInfo region) throws IOException {
- // use different WAL for hbase:meta
+ // Use different WAL for hbase:meta. Instantiates the meta WALProvider if not already up.
if (region != null && region.isMetaRegion() &&
region.getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID) {
return getMetaProvider().getWAL(region);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java
index 34432e43a71..3cefce59d28 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java
@@ -109,12 +109,13 @@ public class TestMetaRegionReplicaReplicationEndpoint {
* Assert that the ReplicationSource for hbase:meta gets created when hbase:meta is opened.
*/
@Test
- public void testHBaseMetaReplicationSourceCreatedOnOpen()
- throws IOException, InterruptedException {
+ public void testHBaseMetaReplicationSourceCreatedOnOpen() throws Exception {
MiniHBaseCluster cluster = HTU.getMiniHBaseCluster();
HRegionServer hrs = cluster.getRegionServer(cluster.getServerHoldingMeta());
+ // Replicate a row to prove all working.
+ testHBaseMetaReplicatesOneRow(0);
assertTrue(isMetaRegionReplicaReplicationSource(hrs));
- // Now move the hbase:meta and make sure the ReplicationSoruce is in both places.
+ // Now move the hbase:meta and make sure the ReplicationSource is in both places.
HRegionServer hrsOther = null;
for (int i = 0; i < cluster.getNumLiveRegionServers(); i++) {
hrsOther = cluster.getRegionServer(i);
@@ -138,6 +139,26 @@ public class TestMetaRegionReplicaReplicationEndpoint {
// Assert that there is a ReplicationSource in both places now.
assertTrue(isMetaRegionReplicaReplicationSource(hrs));
assertTrue(isMetaRegionReplicaReplicationSource(hrsOther));
+ // Replicate to show stuff still works.
+ testHBaseMetaReplicatesOneRow(1);
+ // Now pretend a few hours have gone by... roll the meta WAL in original location... Move the
+ // meta back and retry replication. See if it works.
+ hrs.getWAL(meta.getRegionInfo()).rollWriter(true);
+ testHBaseMetaReplicatesOneRow(2);
+ hrs.getWAL(meta.getRegionInfo()).rollWriter(true);
+ testHBaseMetaReplicatesOneRow(3);
+ }
+
+ /**
+ * Test meta region replica replication. Create some tables and see if replicas pick up the
+ * additions.
+ */
+ private void testHBaseMetaReplicatesOneRow(int i) throws Exception {
+ waitForMetaReplicasToOnline();
+ try (Table table = HTU.createTable(TableName.valueOf(this.name.getMethodName() + "_" + i),
+ HConstants.CATALOG_FAMILY)) {
+ verifyReplication(TableName.META_TABLE_NAME, NB_SERVERS, getMetaCells(table.getName()));
+ }
}
/**
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
index 275ade22a71..c050b86c603 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
@@ -535,7 +535,11 @@ public class TestReplicationSource {
FlakyReplicationEndpoint.class.getName());
try {
rs.startup();
+ assertTrue(rs.isSourceActive());
+ Waiter.waitFor(conf, 1000, () -> FaultyReplicationEndpoint.count > 0);
Waiter.waitFor(conf, 1000, () -> rss.isAborted());
+ assertTrue(rss.isAborted());
+ Waiter.waitFor(conf, 1000, () -> !rs.isSourceActive());
assertFalse(rs.isSourceActive());
} finally {
rs.terminate("Done");