HBASE-25151 warmupRegion frustrates registering WALs on the catalog replicationsource

warmupRegion called by Master on Region move will instatiate
the meta WALProvider as part of its action making it so
it is already created by the time we go to open the
hbsae:meta Region. Accommodate meta walProvider
being already up.

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/AssignRegionHandler.java
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java
 Pass regionInfo. Needed internally.

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
 Add handling if meta wal provider already instantiated when
 addCatalogReplicationSource runs.

hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java
 Add exercising moving meta around between servers. Test replication
 keeps working.
This commit is contained in:
stack 2020-10-03 01:37:45 -07:00
parent de1843ac94
commit 1f6779b2e0
10 changed files with 212 additions and 65 deletions

View File

@ -1193,11 +1193,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
private void initializeWarmup(final CancelableProgressable reporter) throws IOException { private void initializeWarmup(final CancelableProgressable reporter) throws IOException {
MonitoredTask status = TaskMonitor.get().createStatus("Initializing region " + this); MonitoredTask status = TaskMonitor.get().createStatus("Initializing region " + this);
// Initialize all the HStores // Initialize all the HStores
status.setStatus("Warming up all the Stores"); status.setStatus("Warmup all stores of " + this.getRegionInfo().getRegionNameAsString());
try { try {
initializeStores(reporter, status, true); initializeStores(reporter, status, true);
} finally { } finally {
status.markComplete("Done warming up."); status.markComplete("Warmed up " + this.getRegionInfo().getRegionNameAsString());
} }
} }
@ -8130,15 +8130,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
final CancelableProgressable reporter) final CancelableProgressable reporter)
throws IOException { throws IOException {
if (info == null) throw new NullPointerException("Passed region info is null"); Objects.requireNonNull(info, "RegionInfo cannot be null");
LOG.debug("Warmup {}", info);
if (LOG.isDebugEnabled()) {
LOG.debug("HRegion.Warming up region: " + info);
}
Path rootDir = CommonFSUtils.getRootDir(conf); Path rootDir = CommonFSUtils.getRootDir(conf);
Path tableDir = CommonFSUtils.getTableDir(rootDir, info.getTable()); Path tableDir = CommonFSUtils.getTableDir(rootDir, info.getTable());
FileSystem fs = null; FileSystem fs = null;
if (rsServices != null) { if (rsServices != null) {
fs = rsServices.getFileSystem(); fs = rsServices.getFileSystem();
@ -8146,7 +8141,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (fs == null) { if (fs == null) {
fs = rootDir.getFileSystem(conf); fs = rootDir.getFileSystem(conf);
} }
HRegion r = HRegion.newHRegion(tableDir, wal, fs, conf, info, htd, null); HRegion r = HRegion.newHRegion(tableDir, wal, fs, conf, info, htd, null);
r.initializeWarmup(reporter); r.initializeWarmup(reporter);
} }

View File

@ -2062,48 +2062,34 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
} }
/** /**
* Wamrmup a region on this server. * Warmup a region on this server.
* * This method should only be called by Master. It synchronously opens the region and
* This method should only be called by Master. It synchrnously opens the region and
* closes the region bringing the most important pages in cache. * closes the region bringing the most important pages in cache.
* <p>
*
* @param controller the RPC controller
* @param request the request
* @throws ServiceException
*/ */
@Override @Override
public WarmupRegionResponse warmupRegion(final RpcController controller, public WarmupRegionResponse warmupRegion(final RpcController controller,
final WarmupRegionRequest request) throws ServiceException { final WarmupRegionRequest request) throws ServiceException {
final RegionInfo region = ProtobufUtil.toRegionInfo(request.getRegionInfo()); final RegionInfo region = ProtobufUtil.toRegionInfo(request.getRegionInfo());
TableDescriptor htd;
WarmupRegionResponse response = WarmupRegionResponse.getDefaultInstance(); WarmupRegionResponse response = WarmupRegionResponse.getDefaultInstance();
try { try {
checkOpen(); checkOpen();
String encodedName = region.getEncodedName(); String encodedName = region.getEncodedName();
byte[] encodedNameBytes = region.getEncodedNameAsBytes(); byte[] encodedNameBytes = region.getEncodedNameAsBytes();
final HRegion onlineRegion = regionServer.getRegion(encodedName); final HRegion onlineRegion = regionServer.getRegion(encodedName);
if (onlineRegion != null) { if (onlineRegion != null) {
LOG.info("Region already online. Skipping warming up " + region); LOG.info("{} is online; skipping warmup", region);
return response; return response;
} }
TableDescriptor htd = regionServer.tableDescriptors.get(region.getTable());
htd = regionServer.tableDescriptors.get(region.getTable());
if (regionServer.getRegionsInTransitionInRS().containsKey(encodedNameBytes)) { if (regionServer.getRegionsInTransitionInRS().containsKey(encodedNameBytes)) {
LOG.info("Region is in transition. Skipping warmup " + region); LOG.info("{} is in transition; skipping warmup", region);
return response; return response;
} }
LOG.info("Warmup {}", region.getRegionNameAsString());
LOG.info("Warming up region " + region.getRegionNameAsString());
HRegion.warmupHRegion(region, htd, regionServer.getWAL(region), HRegion.warmupHRegion(region, htd, regionServer.getWAL(region),
regionServer.getConfiguration(), regionServer, null); regionServer.getConfiguration(), regionServer, null);
} catch (IOException ie) { } catch (IOException ie) {
LOG.error("Failed warming up region " + region.getRegionNameAsString(), ie); LOG.error("Failed warmup of {}", region.getRegionNameAsString(), ie);
throw new ServiceException(ie); throw new ServiceException(ie);
} }

View File

@ -138,7 +138,8 @@ public class AssignRegionHandler extends EventHandler {
if (ServerRegionReplicaUtil.isMetaRegionReplicaReplicationEnabled(conf, tn)) { if (ServerRegionReplicaUtil.isMetaRegionReplicaReplicationEnabled(conf, tn)) {
if (RegionReplicaUtil.isDefaultReplica(this.regionInfo.getReplicaId())) { if (RegionReplicaUtil.isDefaultReplica(this.regionInfo.getReplicaId())) {
// Add the hbase:meta replication source on replica zero/default. // Add the hbase:meta replication source on replica zero/default.
rs.getReplicationSourceService().getReplicationManager().addCatalogReplicationSource(); rs.getReplicationSourceService().getReplicationManager().
addCatalogReplicationSource(this.regionInfo);
} }
} }
region = HRegion.openHRegion(regionInfo, htd, rs.getWAL(regionInfo), conf, rs, null); region = HRegion.openHRegion(regionInfo, htd, rs.getWAL(regionInfo), conf, rs, null);

View File

@ -126,7 +126,8 @@ public class UnassignRegionHandler extends EventHandler {
if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo().getReplicaId())) { if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo().getReplicaId())) {
// If hbase:meta read replicas enabled, remove replication source for hbase:meta Regions. // If hbase:meta read replicas enabled, remove replication source for hbase:meta Regions.
// See assign region handler where we add the replication source on open. // See assign region handler where we add the replication source on open.
rs.getReplicationSourceService().getReplicationManager().removeCatalogReplicationSource(); rs.getReplicationSourceService().getReplicationManager().
removeCatalogReplicationSource(region.getRegionInfo());
} }
} }
if (!rs.reportRegionStateTransition( if (!rs.reportRegionStateTransition(

View File

@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSink;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
@InterfaceAudience.Private
public class ReplicationSinkServiceImpl implements ReplicationSinkService {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationSinkServiceImpl.class);
private Configuration conf;
private Server server;
private ReplicationSink replicationSink;
// ReplicationLoad to access replication metrics
private ReplicationLoad replicationLoad;
private int statsPeriodInSecond;
@Override
public void replicateLogEntries(List<AdminProtos.WALEntry> entries, CellScanner cells,
String replicationClusterId, String sourceBaseNamespaceDirPath,
String sourceHFileArchiveDirPath) throws IOException {
this.replicationSink.replicateEntries(entries, cells, replicationClusterId,
sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath);
}
@Override
public void initialize(Server server, FileSystem fs, Path logdir, Path oldLogDir,
WALFactory walFactory) throws IOException {
this.server = server;
this.conf = server.getConfiguration();
this.statsPeriodInSecond =
this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
this.replicationLoad = new ReplicationLoad();
}
@Override
public void startReplicationService() throws IOException {
this.replicationSink = new ReplicationSink(this.conf);
this.server.getChoreService().scheduleChore(
new ReplicationStatisticsChore("ReplicationSinkStatistics", server,
(int) TimeUnit.SECONDS.toMillis(statsPeriodInSecond)));
}
@Override
public void stopReplicationService() {
if (this.replicationSink != null) {
this.replicationSink.stopReplicationSinkServices();
}
}
@Override
public ReplicationLoad refreshAndGetReplicationLoad() {
if (replicationLoad == null) {
return null;
}
// always build for latest data
replicationLoad.buildReplicationLoad(Collections.emptyList(), replicationSink.getSinkMetrics());
return replicationLoad;
}
private final class ReplicationStatisticsChore extends ScheduledChore {
ReplicationStatisticsChore(String name, Stoppable stopper, int period) {
super(name, stopper, period);
}
@Override
protected void chore() {
printStats(replicationSink.getStats());
}
private void printStats(String stats) {
if (!stats.isEmpty()) {
LOG.info(stats);
}
}
}
}

View File

@ -444,11 +444,22 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
try { try {
locations = RegionReplicaReplayCallable locations = RegionReplicaReplayCallable
.getRegionLocations(connection, tableName, row, useCache, 0); .getRegionLocations(connection, tableName, row, useCache, 0);
if (locations == null) { if (locations == null) {
throw new HBaseIOException("Cannot locate locations for " throw new HBaseIOException("Cannot locate locations for "
+ tableName + ", row:" + Bytes.toStringBinary(row)); + tableName + ", row:" + Bytes.toStringBinary(row));
} }
// Replicas can take a while to come online. The cache may have only the primary. If we
// keep going to the cache, we will not learn of the replicas and their locations after
// they come online.
if (useCache && locations.size() == 1 && TableName.isMetaTableName(tableName)) {
if (tableDescriptors.get(tableName).getRegionReplication() > 1) {
// Make an obnoxious log here. See how bad this issue is. Add a timer if happening
// too much.
LOG.info("Skipping location cache; only one location found for {}", tableName);
useCache = false;
continue;
}
}
} catch (TableNotFoundException e) { } catch (TableNotFoundException e) {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Skipping " + entries.size() + " entries because table " + tableName LOG.trace("Skipping " + entries.size() + " entries because table " + tableName
@ -467,7 +478,7 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
// entry is not coming from the primary region, filter it out. // entry is not coming from the primary region, filter it out.
HRegionLocation primaryLocation = locations.getDefaultRegionLocation(); HRegionLocation primaryLocation = locations.getDefaultRegionLocation();
if (!Bytes.equals(primaryLocation.getRegionInfo().getEncodedNameAsBytes(), if (!Bytes.equals(primaryLocation.getRegionInfo().getEncodedNameAsBytes(),
encodedRegionName)) { encodedRegionName)) {
if (useCache) { if (useCache) {
useCache = false; useCache = false;
continue; // this will retry location lookup continue; // this will retry location lookup

View File

@ -50,6 +50,8 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationListener; import org.apache.hadoop.hbase.replication.ReplicationListener;
@ -63,6 +65,7 @@ import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@ -343,7 +346,7 @@ public class ReplicationSourceManager implements ReplicationListener {
/** /**
* @return a new 'classic' user-space replication source. * @return a new 'classic' user-space replication source.
* @param queueId the id of the replication queue to associate the ReplicationSource with. * @param queueId the id of the replication queue to associate the ReplicationSource with.
* @see #createCatalogReplicationSource() for creating a ReplicationSource for hbase:meta. * @see #createCatalogReplicationSource(RegionInfo) for creating a ReplicationSource for meta.
*/ */
private ReplicationSourceInterface createSource(String queueId, ReplicationPeer replicationPeer) private ReplicationSourceInterface createSource(String queueId, ReplicationPeer replicationPeer)
throws IOException { throws IOException {
@ -985,57 +988,65 @@ public class ReplicationSourceManager implements ReplicationListener {
/** /**
* Add an hbase:meta Catalog replication source. Called on open of an hbase:meta Region. * Add an hbase:meta Catalog replication source. Called on open of an hbase:meta Region.
* @see #removeCatalogReplicationSource() * Create it once only. If exists already, use the existing one.
* @see #removeCatalogReplicationSource(RegionInfo)
* @see #addSource(String) This is specialization on the addSource method.
*/ */
public ReplicationSourceInterface addCatalogReplicationSource() throws IOException { public ReplicationSourceInterface addCatalogReplicationSource(RegionInfo regionInfo)
// Open/Create the hbase:meta ReplicationSource once only. throws IOException {
// Poor-man's putIfAbsent
synchronized (this.catalogReplicationSource) { synchronized (this.catalogReplicationSource) {
ReplicationSourceInterface rs = this.catalogReplicationSource.get(); ReplicationSourceInterface rs = this.catalogReplicationSource.get();
return rs != null ? rs : return rs != null ? rs :
this.catalogReplicationSource.getAndSet(createCatalogReplicationSource()); this.catalogReplicationSource.getAndSet(createCatalogReplicationSource(regionInfo));
} }
} }
/** /**
* Remove the hbase:meta Catalog replication source. * Remove the hbase:meta Catalog replication source.
* Called when we close hbase:meta. * Called when we close hbase:meta.
* @see #addCatalogReplicationSource() * @see #addCatalogReplicationSource(RegionInfo regionInfo)
*/ */
public void removeCatalogReplicationSource() { public void removeCatalogReplicationSource(RegionInfo regionInfo) {
// Nothing to do. Leave any CatalogReplicationSource in place in case an hbase:meta Region // Nothing to do. Leave any CatalogReplicationSource in place in case an hbase:meta Region
// comes back to this server. // comes back to this server.
} }
/** /**
* Create, initialize, and start the Catalog ReplicationSource. * Create, initialize, and start the Catalog ReplicationSource.
* Presumes called one-time only (caller must ensure one-time only call).
* @see #addSource(String) This is a specialization of the addSource call.
*/ */
private ReplicationSourceInterface createCatalogReplicationSource() throws IOException { private ReplicationSourceInterface createCatalogReplicationSource(RegionInfo regionInfo)
// Has the hbase:meta WALProvider been instantiated? throws IOException {
// Instantiate meta walProvider. Instantiated here or over in the #warmupRegion call made by the
// Master on a 'move' operation. Need to do extra work if we did NOT instantiate the provider.
WALProvider walProvider = this.walFactory.getMetaWALProvider(); WALProvider walProvider = this.walFactory.getMetaWALProvider();
boolean addListener = false; boolean instantiate = walProvider == null;
if (walProvider == null) { if (instantiate) {
// The meta walProvider has not been instantiated. Create it.
walProvider = this.walFactory.getMetaProvider(); walProvider = this.walFactory.getMetaProvider();
addListener = true;
} }
CatalogReplicationSourcePeer peer = new CatalogReplicationSourcePeer(this.conf, CatalogReplicationSourcePeer peer = new CatalogReplicationSourcePeer(this.conf,
this.clusterId.toString(), "meta_" + ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER); this.clusterId.toString(), "meta_" + ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER);
final ReplicationSourceInterface crs = new CatalogReplicationSource(); final ReplicationSourceInterface crs = new CatalogReplicationSource();
crs.init(conf, fs, this, new NoopReplicationQueueStorage(), peer, server, peer.getId(), crs.init(conf, fs, this, new NoopReplicationQueueStorage(), peer, server, peer.getId(),
clusterId, walProvider.getWALFileLengthProvider(), new MetricsSource(peer.getId())); clusterId, walProvider.getWALFileLengthProvider(), new MetricsSource(peer.getId()));
if (addListener) { // Add listener on the provider so we can pick up the WAL to replicate on roll.
walProvider.addWALActionsListener(new WALActionsListener() { WALActionsListener listener = new WALActionsListener() {
@Override @Override public void postLogRoll(Path oldPath, Path newPath) throws IOException {
public void postLogRoll(Path oldPath, Path newPath) throws IOException { crs.enqueueLog(newPath);
crs.enqueueLog(newPath); }
} };
}); walProvider.addWALActionsListener(listener);
} else { if (!instantiate) {
// This is a problem. We'll have a ReplicationSource but no listener on hbase:meta WALs // If we did not instantiate provider, need to add our listener on already-created WAL
// so nothing will be replicated. // instance too (listeners are passed by provider to WAL instance on creation but if provider
LOG.error("Did not install WALActionsListener creating CatalogReplicationSource!"); // created already, our listener add above is missed). And add the current WAL file to the
// Replication Source so it can start replicating it.
WAL wal = walProvider.getWAL(regionInfo);
wal.registerWALActionsListener(listener);
crs.enqueueLog(((AbstractFSWAL)wal).getCurrentFileName());
} }
// Start this ReplicationSource.
return crs.startup(); return crs.startup();
} }
} }

View File

@ -289,10 +289,10 @@ public class WALFactory {
} }
/** /**
* @param region the region which we want to get a WAL for it. Could be null. * @param region the region which we want to get a WAL for. Could be null.
*/ */
public WAL getWAL(RegionInfo region) throws IOException { public WAL getWAL(RegionInfo region) throws IOException {
// use different WAL for hbase:meta // Use different WAL for hbase:meta. Instantiates the meta WALProvider if not already up.
if (region != null && region.isMetaRegion() && if (region != null && region.isMetaRegion() &&
region.getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID) { region.getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID) {
return getMetaProvider().getWAL(region); return getMetaProvider().getWAL(region);

View File

@ -109,12 +109,13 @@ public class TestMetaRegionReplicaReplicationEndpoint {
* Assert that the ReplicationSource for hbase:meta gets created when hbase:meta is opened. * Assert that the ReplicationSource for hbase:meta gets created when hbase:meta is opened.
*/ */
@Test @Test
public void testHBaseMetaReplicationSourceCreatedOnOpen() public void testHBaseMetaReplicationSourceCreatedOnOpen() throws Exception {
throws IOException, InterruptedException {
MiniHBaseCluster cluster = HTU.getMiniHBaseCluster(); MiniHBaseCluster cluster = HTU.getMiniHBaseCluster();
HRegionServer hrs = cluster.getRegionServer(cluster.getServerHoldingMeta()); HRegionServer hrs = cluster.getRegionServer(cluster.getServerHoldingMeta());
// Replicate a row to prove all working.
testHBaseMetaReplicatesOneRow(0);
assertTrue(isMetaRegionReplicaReplicationSource(hrs)); assertTrue(isMetaRegionReplicaReplicationSource(hrs));
// Now move the hbase:meta and make sure the ReplicationSoruce is in both places. // Now move the hbase:meta and make sure the ReplicationSource is in both places.
HRegionServer hrsOther = null; HRegionServer hrsOther = null;
for (int i = 0; i < cluster.getNumLiveRegionServers(); i++) { for (int i = 0; i < cluster.getNumLiveRegionServers(); i++) {
hrsOther = cluster.getRegionServer(i); hrsOther = cluster.getRegionServer(i);
@ -138,6 +139,26 @@ public class TestMetaRegionReplicaReplicationEndpoint {
// Assert that there is a ReplicationSource in both places now. // Assert that there is a ReplicationSource in both places now.
assertTrue(isMetaRegionReplicaReplicationSource(hrs)); assertTrue(isMetaRegionReplicaReplicationSource(hrs));
assertTrue(isMetaRegionReplicaReplicationSource(hrsOther)); assertTrue(isMetaRegionReplicaReplicationSource(hrsOther));
// Replicate to show stuff still works.
testHBaseMetaReplicatesOneRow(1);
// Now pretend a few hours have gone by... roll the meta WAL in original location... Move the
// meta back and retry replication. See if it works.
hrs.getWAL(meta.getRegionInfo()).rollWriter(true);
testHBaseMetaReplicatesOneRow(2);
hrs.getWAL(meta.getRegionInfo()).rollWriter(true);
testHBaseMetaReplicatesOneRow(3);
}
/**
* Test meta region replica replication. Create some tables and see if replicas pick up the
* additions.
*/
private void testHBaseMetaReplicatesOneRow(int i) throws Exception {
waitForMetaReplicasToOnline();
try (Table table = HTU.createTable(TableName.valueOf(this.name.getMethodName() + "_" + i),
HConstants.CATALOG_FAMILY)) {
verifyReplication(TableName.META_TABLE_NAME, NB_SERVERS, getMetaCells(table.getName()));
}
} }
/** /**

View File

@ -535,7 +535,11 @@ public class TestReplicationSource {
FlakyReplicationEndpoint.class.getName()); FlakyReplicationEndpoint.class.getName());
try { try {
rs.startup(); rs.startup();
assertTrue(rs.isSourceActive());
Waiter.waitFor(conf, 1000, () -> FaultyReplicationEndpoint.count > 0);
Waiter.waitFor(conf, 1000, () -> rss.isAborted()); Waiter.waitFor(conf, 1000, () -> rss.isAborted());
assertTrue(rss.isAborted());
Waiter.waitFor(conf, 1000, () -> !rs.isSourceActive());
assertFalse(rs.isSourceActive()); assertFalse(rs.isSourceActive());
} finally { } finally {
rs.terminate("Done"); rs.terminate("Done");