diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 29121651533..27972403700 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -2473,9 +2473,9 @@ public class HRegionServer extends Thread implements if (ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) { return; } - if (!ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(region.conf) || - !ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled( - region.conf)) { + TableName tn = region.getTableDescriptor().getTableName(); + if (!ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(region.conf, tn) || + !ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(region.conf)) { region.setReadsEnabled(true); return; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/AssignRegionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/AssignRegionHandler.java index 98d09b20e87..4ee6efc13dc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/AssignRegionHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/AssignRegionHandler.java @@ -20,8 +20,11 @@ package org.apache.hadoop.hbase.regionserver.handler; import edu.umd.cs.findbugs.annotations.Nullable; import java.io.IOException; import java.util.concurrent.TimeUnit; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.executor.EventType; @@ -31,6 +34,7 @@ import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionServerServices.PostOpenDeployContext; import org.apache.hadoop.hbase.regionserver.RegionServerServices.RegionStateTransitionContext; import org.apache.hadoop.hbase.util.RetryCounter; +import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -129,8 +133,15 @@ public class AssignRegionHandler extends EventHandler { } // pass null for the last parameter, which used to be a CancelableProgressable, as now the // opening can not be interrupted by a close request any more. - region = HRegion.openHRegion(regionInfo, htd, rs.getWAL(regionInfo), rs.getConfiguration(), - rs, null); + Configuration conf = rs.getConfiguration(); + TableName tn = htd.getTableName(); + if (ServerRegionReplicaUtil.isMetaRegionReplicaReplicationEnabled(conf, tn)) { + if (RegionReplicaUtil.isDefaultReplica(this.regionInfo.getReplicaId())) { + // Add the hbase:meta replication source on replica zero/default. + rs.getReplicationSourceService().getReplicationManager().addCatalogReplicationSource(); + } + } + region = HRegion.openHRegion(regionInfo, htd, rs.getWAL(regionInfo), conf, rs, null); } catch (IOException e) { cleanUpAndReportFailure(e); return; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java index a20443ce646..b5204af5e53 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java @@ -109,9 +109,9 @@ public class RegionReplicaFlushHandler extends EventHandler { RetryCounter counter = new RetryCounterFactory(maxAttempts, (int)pause).create(); if (LOG.isDebugEnabled()) { - LOG.debug("RPC'ing to primary region replica " + - ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo()) + " from " + - region.getRegionInfo() + " to trigger FLUSH"); + LOG.debug("RPC'ing to primary " + ServerRegionReplicaUtil. + getRegionInfoForDefaultReplica(region.getRegionInfo()).getRegionNameAsString() + + " from " + region.getRegionInfo().getRegionNameAsString() + " to trigger FLUSH"); } while (!region.isClosing() && !region.isClosed() && !server.isAborted() && !server.isStopped()) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java index 0bf2543a445..1ed74bb86e0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.regionserver.HRegion; @@ -30,6 +31,7 @@ import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionServerServices.RegionStateTransitionContext; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.RetryCounter; +import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -113,12 +115,20 @@ public class UnassignRegionHandler extends EventHandler { if (region.close(abort) == null) { // XXX: Is this still possible? The old comment says about split, but now split is done at // master side, so... - LOG.warn("Can't close {} already closed during close()", regionName); + LOG.warn("Can't close {}, already closed during close()", regionName); rs.getRegionsInTransitionInRS().remove(encodedNameBytes, Boolean.FALSE); return; } rs.removeRegion(region, destination); + if (ServerRegionReplicaUtil.isMetaRegionReplicaReplicationEnabled(rs.getConfiguration(), + region.getTableDescriptor().getTableName())) { + if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo().getReplicaId())) { + // If hbase:meta read replicas enabled, remove replication source for hbase:meta Regions. + // See assign region handler where we add the replication source on open. + rs.getReplicationSourceService().getReplicationManager().removeCatalogReplicationSource(); + } + } if (!rs.reportRegionStateTransition( new RegionStateTransitionContext(TransitionCode.CLOSED, HConstants.NO_SEQNUM, closeProcId, -1, region.getRegionInfo()))) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicationSource.java new file mode 100644 index 00000000000..f36514d0c21 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicationSource.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import java.util.Collections; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * ReplicationSource that reads catalog WAL files -- e.g. hbase:meta WAL files -- and lets through + * all WALEdits from these WALs. This ReplicationSource is NOT created via + * {@link ReplicationSourceFactory}. + */ +@InterfaceAudience.Private +class CatalogReplicationSource extends ReplicationSource { + CatalogReplicationSource() { + // Filters in hbase:meta WAL files and allows all edits, including 'meta' edits (these are + // filtered out in the 'super' class default implementation). + super(p -> AbstractFSWALProvider.isMetaFile(p), Collections.emptyList()); + } + + @Override + public void logPositionAndCleanOldLogs(WALEntryBatch entryBatch) { + // Noop. This implementation does not persist state to backing storage nor does it keep its + // WALs in a general map up in ReplicationSourceManager so just skip calling through to the + // default implemenentation. + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicationSourcePeer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicationSourcePeer.java new file mode 100644 index 00000000000..b853217e688 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicationSourcePeer.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationPeerImpl; +import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * The 'peer' used internally by Catalog Region Replicas Replication Source. + * The Replication system has 'peer' baked into its core so though we do not need 'peering', we + * need a 'peer' and its configuration else the replication system breaks at a few locales. + * Set "hbase.region.replica.catalog.replication" if you want to change the configured endpoint. + */ +@InterfaceAudience.Private +class CatalogReplicationSourcePeer extends ReplicationPeerImpl { + /** + * @param clusterKey Usually the UUID from zk passed in by caller as a String. + */ + CatalogReplicationSourcePeer(Configuration configuration, String clusterKey, String peerId) { + super(configuration, ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER + "_catalog", + true, + ReplicationPeerConfig.newBuilder(). + setClusterKey(clusterKey). + setReplicationEndpointImpl( + configuration.get("hbase.region.replica.catalog.replication", + RegionReplicaReplicationEndpoint.class.getName())). + setBandwidth(0). // '0' means no bandwidth. + setSerial(false). + build()); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/NoopReplicationQueueStorage.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/NoopReplicationQueueStorage.java new file mode 100644 index 00000000000..4ad41fc6983 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/NoopReplicationQueueStorage.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Noop queue storage -- does nothing. + */ +@InterfaceAudience.Private +class NoopReplicationQueueStorage implements ReplicationQueueStorage { + NoopReplicationQueueStorage() {} + + @Override + public void removeQueue(ServerName serverName, String queueId) throws ReplicationException {} + + @Override + public void addWAL(ServerName serverName, String queueId, String fileName) + throws ReplicationException {} + + @Override + public void removeWAL(ServerName serverName, String queueId, String fileName) + throws ReplicationException { } + + @Override + public void setWALPosition(ServerName serverName, String queueId, String fileName, long position, + Map lastSeqIds) throws ReplicationException {} + + @Override + public long getLastSequenceId(String encodedRegionName, String peerId) + throws ReplicationException { + return 0; + } + + @Override + public void setLastSequenceIds(String peerId, Map lastSeqIds) + throws ReplicationException {} + + @Override + public void removeLastSequenceIds(String peerId) throws ReplicationException {} + + @Override + public void removeLastSequenceIds(String peerId, List encodedRegionNames) + throws ReplicationException {} + + @Override + public long getWALPosition(ServerName serverName, String queueId, String fileName) + throws ReplicationException { + return 0; + } + + @Override + public List getWALsInQueue(ServerName serverName, String queueId) + throws ReplicationException { + return Collections.EMPTY_LIST; + } + + @Override + public List getAllQueues(ServerName serverName) throws ReplicationException { + return Collections.EMPTY_LIST; + } + + @Override + public Pair> claimQueue(ServerName sourceServerName, String queueId, + ServerName destServerName) throws ReplicationException { + return null; + } + + @Override + public void removeReplicatorIfQueueIsEmpty(ServerName serverName) + throws ReplicationException {} + + @Override + public List getListOfReplicators() throws ReplicationException { + return Collections.EMPTY_LIST; + } + + @Override + public Set getAllWALs() throws ReplicationException { + return Collections.EMPTY_SET; + } + + @Override + public void addPeerToHFileRefs(String peerId) throws ReplicationException {} + + @Override + public void removePeerFromHFileRefs(String peerId) throws ReplicationException {} + + @Override + public void addHFileRefs(String peerId, List> pairs) + throws ReplicationException {} + + @Override + public void removeHFileRefs(String peerId, List files) throws ReplicationException {} + + @Override + public List getAllPeersFromHFileRefsQueue() throws ReplicationException { + return Collections.EMPTY_LIST; + } + + @Override + public List getReplicableHFiles(String peerId) throws ReplicationException { + return Collections.EMPTY_LIST; + } + + @Override + public Set getAllHFileRefs() throws ReplicationException { + return Collections.EMPTY_SET; + } + + @Override + public String getRsNode(ServerName serverName) { + return null; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java index f9645dc1c18..39c4beb0436 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 136dcd40012..4ca2c56d3a4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.OptionalLong; import java.util.UUID; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -124,11 +123,11 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer } this.globalMetricsSource = CompatibilitySingletonFactory .getInstance(MetricsReplicationSourceFactory.class).getGlobalSource(); - WALProvider walProvider = walFactory.getWALProvider(); this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers, - replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId, - walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty(), - globalMetricsSource); + replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId, walFactory, + globalMetricsSource); + // Get the user-space WAL provider + WALProvider walProvider = walFactory != null? walFactory.getWALProvider(): null; if (walProvider != null) { walProvider .addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager)); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 645c14d8510..f30ab29b583 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -65,10 +65,10 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL.Entry; -import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; /** @@ -227,6 +227,7 @@ public class ReplicationSource implements ReplicationSourceInterface { this.peerId = this.replicationQueueInfo.getPeerId(); this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2); + // A defaultBandwidth of '0' means no bandwidth; i.e. no throttling. defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0); currentBandwidth = getCurrentBandwidth(); this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0); @@ -378,16 +379,10 @@ public class ReplicationSource implements ReplicationSourceInterface { private void tryStartNewShipper(String walGroupId, PriorityBlockingQueue queue) { workerThreads.compute(walGroupId, (key, value) -> { if (value != null) { - if (LOG.isDebugEnabled()) { - LOG.debug( - "{} Someone has beat us to start a worker thread for wal group {}", - logPeerId(), key); - } + LOG.debug("{} preempted start of shipping worker walGroupId={}", logPeerId(), walGroupId); return value; } else { - if (LOG.isDebugEnabled()) { - LOG.debug("{} Starting up worker for wal group {}", logPeerId(), key); - } + LOG.debug("{} starting shipping worker for walGroupId={}", logPeerId(), walGroupId); ReplicationSourceShipper worker = createNewShipper(walGroupId, queue); ReplicationSourceWALReader walReader = createNewWALReader(walGroupId, queue, worker.getStartPosition()); @@ -532,7 +527,7 @@ public class ReplicationSource implements ReplicationSourceInterface { private long getCurrentBandwidth() { long peerBandwidth = replicationPeer.getPeerBandwidth(); - // user can set peer bandwidth to 0 to use default bandwidth + // User can set peer bandwidth to 0 to use default bandwidth. return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth; } @@ -627,11 +622,11 @@ public class ReplicationSource implements ReplicationSourceInterface { this.startupOngoing.set(false); throw new IllegalStateException("Source should be active."); } - LOG.info("{} queueId={} is replicating from cluster={} to cluster={}", - logPeerId(), this.replicationQueueInfo.getQueueId(), clusterId, peerClusterId); - + LOG.info("{} queueId={} (queues={}) is replicating from cluster={} to cluster={}", + logPeerId(), this.replicationQueueInfo.getQueueId(), this.queues.size(), clusterId, + peerClusterId); initializeWALEntryFilter(peerClusterId); - // start workers + // Start workers for (Map.Entry> entry : queues.entrySet()) { String walGroupId = entry.getKey(); PriorityBlockingQueue queue = entry.getValue(); @@ -641,11 +636,10 @@ public class ReplicationSource implements ReplicationSourceInterface { } @Override - public void startup() { + public ReplicationSourceInterface startup() { if (this.sourceRunning) { - return; + return this; } - // Mark we are running now this.sourceRunning = true; startupOngoing.set(true); initThread = new Thread(this::initialize); @@ -673,6 +667,7 @@ public class ReplicationSource implements ReplicationSourceInterface { } } while ((this.startupOngoing.get() || this.retryStartup.get()) && !this.abortOnError); }); + return this; } @Override @@ -851,7 +846,8 @@ public class ReplicationSource implements ReplicationSourceInterface { return server; } - ReplicationQueueStorage getQueueStorage() { + @Override + public ReplicationQueueStorage getReplicationQueueStorage() { return queueStorage; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java index d613049d389..8863f141f1a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java @@ -19,19 +19,22 @@ package org.apache.hadoop.hbase.replication.regionserver; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; /** * Constructs a {@link ReplicationSourceInterface} + * Note, not used to create specialized ReplicationSources + * @see CatalogReplicationSource */ @InterfaceAudience.Private -public class ReplicationSourceFactory { - +public final class ReplicationSourceFactory { private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceFactory.class); + private ReplicationSourceFactory() {} + static ReplicationSourceInterface create(Configuration conf, String queueId) { ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queueId); boolean isQueueRecovered = replicationQueueInfo.isQueueRecovered(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java index d287acb9a6b..ca68f9a305e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java @@ -23,7 +23,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -43,7 +42,6 @@ import org.apache.yetus.audience.InterfaceAudience; */ @InterfaceAudience.Private public interface ReplicationSourceInterface { - /** * Initializer for the source * @param conf the configuration to use @@ -76,7 +74,7 @@ public interface ReplicationSourceInterface { /** * Start the replication */ - void startup(); + ReplicationSourceInterface startup(); /** * End the replication @@ -159,7 +157,6 @@ public interface ReplicationSourceInterface { /** * Try to throttle when the peer config with a bandwidth * @param batchSize entries size will be pushed - * @throws InterruptedException */ void tryThrottle(int batchSize) throws InterruptedException; @@ -191,4 +188,21 @@ public interface ReplicationSourceInterface { default boolean isRecovered() { return false; } + + /** + * @return The instance of queueStorage used by this ReplicationSource. + */ + ReplicationQueueStorage getReplicationQueueStorage(); + + /** + * Log the current position to storage. Also clean old logs from the replication queue. + * Use to bypass the default call to + * {@link ReplicationSourceManager#logPositionAndCleanOldLogs(ReplicationSourceInterface, + * WALEntryBatch)} whem implementation does not need to persist state to backing storage. + * @param entryBatch the wal entry batch we just shipped + * @return The instance of queueStorage used by this ReplicationSource. + */ + default void logPositionAndCleanOldLogs(WALEntryBatch entryBatch) { + getSourceManager().logPositionAndCleanOldLogs(this, entryBatch); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 4c4b0de66e6..93730d411c9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -26,6 +26,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NavigableSet; +import java.util.OptionalLong; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; @@ -39,6 +40,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -48,6 +50,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationListener; import org.apache.hadoop.hbase.replication.ReplicationPeer; @@ -58,12 +61,14 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationTracker; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -120,7 +125,15 @@ public class ReplicationSourceManager implements ReplicationListener { private final ConcurrentMap sources; // List of all the sources we got from died RSs private final List oldsources; + + /** + * Storage for queues that need persistance; e.g. Replication state so can be recovered + * after a crash. queueStorage upkeep is spread about this class and passed + * to ReplicationSource instances for these to do updates themselves. Not all ReplicationSource + * instances keep state. + */ private final ReplicationQueueStorage queueStorage; + private final ReplicationTracker replicationTracker; private final ReplicationPeers replicationPeers; // UUID for this cluster @@ -145,7 +158,7 @@ public class ReplicationSourceManager implements ReplicationListener { private final Path logDir; // Path to the wal archive private final Path oldLogDir; - private final WALFileLengthProvider walFileLengthProvider; + private final WALFactory walFactory; // The number of ms that we wait before moving znodes, HBASE-3596 private final long sleepBeforeFailover; // Homemade executer service for replication @@ -159,22 +172,30 @@ public class ReplicationSourceManager implements ReplicationListener { private final long totalBufferLimit; private final MetricsReplicationGlobalSourceSource globalMetrics; + /** + * A special ReplicationSource for hbase:meta Region Read Replicas. + * Usually this reference remains empty. If an hbase:meta Region is opened on this server, we + * will create an instance of a hbase:meta CatalogReplicationSource and it will live the life of + * the Server thereafter; i.e. we will not shut it down even if the hbase:meta moves away from + * this server (in case it later gets moved back). We synchronize on this instance testing for + * presence and if absent, while creating so only created and started once. + */ + @VisibleForTesting + AtomicReference catalogReplicationSource = new AtomicReference<>(); + /** * Creates a replication manager and sets the watch on all the other registered region servers * @param queueStorage the interface for manipulating replication queues - * @param replicationPeers - * @param replicationTracker * @param conf the configuration to use * @param server the server for this region server * @param fs the file system to use * @param logDir the directory that contains all wal directories of live RSs * @param oldLogDir the directory where old logs are archived - * @param clusterId */ public ReplicationSourceManager(ReplicationQueueStorage queueStorage, ReplicationPeers replicationPeers, ReplicationTracker replicationTracker, Configuration conf, Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID clusterId, - WALFileLengthProvider walFileLengthProvider, + WALFactory walFactory, MetricsReplicationGlobalSourceSource globalMetrics) throws IOException { // CopyOnWriteArrayList is thread-safe. // Generally, reading is more than modifying. @@ -193,7 +214,7 @@ public class ReplicationSourceManager implements ReplicationListener { this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 30000); // 30 // seconds this.clusterId = clusterId; - this.walFileLengthProvider = walFileLengthProvider; + this.walFactory = walFactory; this.replicationTracker.registerListener(this); // It's preferable to failover 1 RS at a time, but with good zk servers // more could be processed at the same time. @@ -320,18 +341,21 @@ public class ReplicationSourceManager implements ReplicationListener { } /** - * Factory method to create a replication source - * @param queueId the id of the replication queue - * @return the created source + * @return a new 'classic' user-space replication source. + * @param queueId the id of the replication queue to associate the ReplicationSource with. + * @see #createCatalogReplicationSource() for creating a ReplicationSource for hbase:meta. */ private ReplicationSourceInterface createSource(String queueId, ReplicationPeer replicationPeer) throws IOException { ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, queueId); - - MetricsSource metrics = new MetricsSource(queueId); - // init replication source + // Init the just created replication source. Pass the default walProvider's wal file length + // provider. Presumption is we replicate user-space Tables only. For hbase:meta region replica + // replication, see #createCatalogReplicationSource(). + WALFileLengthProvider walFileLengthProvider = + this.walFactory.getWALProvider() != null? + this.walFactory.getWALProvider().getWALFileLengthProvider() : p -> OptionalLong.empty(); src.init(conf, fs, this, queueStorage, replicationPeer, server, queueId, clusterId, - walFileLengthProvider, metrics); + walFileLengthProvider, new MetricsSource(queueId)); return src; } @@ -518,17 +542,16 @@ public class ReplicationSourceManager implements ReplicationListener { /** * This method will log the current position to storage. And also clean old logs from the * replication queue. - * @param queueId id of the replication queue - * @param queueRecovered indicates if this queue comes from another region server * @param entryBatch the wal entry batch we just shipped */ - public void logPositionAndCleanOldLogs(String queueId, boolean queueRecovered, + public void logPositionAndCleanOldLogs(ReplicationSourceInterface source, WALEntryBatch entryBatch) { String fileName = entryBatch.getLastWalPath().getName(); + String queueId = source.getQueueId(); interruptOrAbortWhenFail(() -> this.queueStorage .setWALPosition(server.getServerName(), queueId, fileName, entryBatch.getLastWalPosition(), entryBatch.getLastSeqIds())); - cleanOldLogs(fileName, entryBatch.isEndOfFile(), queueId, queueRecovered); + cleanOldLogs(fileName, entryBatch.isEndOfFile(), queueId, source.isRecovered()); } /** @@ -959,4 +982,60 @@ public class ReplicationSourceManager implements ReplicationListener { MetricsReplicationGlobalSourceSource getGlobalMetrics() { return this.globalMetrics; } + + /** + * Add an hbase:meta Catalog replication source. Called on open of an hbase:meta Region. + * @see #removeCatalogReplicationSource() + */ + public ReplicationSourceInterface addCatalogReplicationSource() throws IOException { + // Open/Create the hbase:meta ReplicationSource once only. + synchronized (this.catalogReplicationSource) { + ReplicationSourceInterface rs = this.catalogReplicationSource.get(); + return rs != null ? rs : + this.catalogReplicationSource.getAndSet(createCatalogReplicationSource()); + } + } + + /** + * Remove the hbase:meta Catalog replication source. + * Called when we close hbase:meta. + * @see #addCatalogReplicationSource() + */ + public void removeCatalogReplicationSource() { + // Nothing to do. Leave any CatalogReplicationSource in place in case an hbase:meta Region + // comes back to this server. + } + + /** + * Create, initialize, and start the Catalog ReplicationSource. + */ + private ReplicationSourceInterface createCatalogReplicationSource() throws IOException { + // Has the hbase:meta WALProvider been instantiated? + WALProvider walProvider = this.walFactory.getMetaWALProvider(); + boolean addListener = false; + if (walProvider == null) { + // The meta walProvider has not been instantiated. Create it. + walProvider = this.walFactory.getMetaProvider(); + addListener = true; + } + CatalogReplicationSourcePeer peer = new CatalogReplicationSourcePeer(this.conf, + this.clusterId.toString(), "meta_" + ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER); + final ReplicationSourceInterface crs = new CatalogReplicationSource(); + crs.init(conf, fs, this, new NoopReplicationQueueStorage(), peer, server, peer.getId(), + clusterId, walProvider.getWALFileLengthProvider(), new MetricsSource(peer.getId())); + if (addListener) { + walProvider.addWALActionsListener(new WALActionsListener() { + @Override + public void postLogRoll(Path oldPath, Path newPath) throws IOException { + crs.enqueueLog(newPath); + } + }); + } else { + // This is a problem. We'll have a ReplicationSource but no listener on hbase:meta WALs + // so nothing will be replicated. + LOG.error("Did not install WALActionsListener creating CatalogReplicationSource!"); + } + // Start this ReplicationSource. + return crs.startup(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index b171eb4d78d..f0202ef95f9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -33,7 +33,6 @@ import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; @@ -260,8 +259,7 @@ public class ReplicationSourceShipper extends Thread { // position and the file will be removed soon in cleanOldLogs. if (batch.isEndOfFile() || !batch.getLastWalPath().equals(currentPath) || batch.getLastWalPosition() != currentPosition) { - source.getSourceManager().logPositionAndCleanOldLogs(source.getQueueId(), - source.isRecovered(), batch); + source.logPositionAndCleanOldLogs(batch); updated = true; } // if end of file is true, then we can just skip to the next file in queue. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationChecker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationChecker.java index 321bbb420bc..4dd6611763e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationChecker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationChecker.java @@ -37,7 +37,6 @@ import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.com.google.common.cache.Cache; import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader; @@ -140,7 +139,7 @@ class SerialReplicationChecker { public SerialReplicationChecker(Configuration conf, ReplicationSource source) { this.peerId = source.getPeerId(); - this.storage = source.getQueueStorage(); + this.storage = source.getReplicationQueueStorage(); this.conn = source.getServer().getConnection(); this.waitTimeMs = conf.getLong(REPLICATION_SERIALLY_WAITING_KEY, REPLICATION_SERIALLY_WAITING_DEFAULT); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java index 010fa690052..c60faa9e5db 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java @@ -25,6 +25,9 @@ import org.apache.yetus.audience.InterfaceAudience; /** * Used by replication to prevent replicating unacked log entries. See * https://issues.apache.org/jira/browse/HBASE-14004 for more details. + * WALFileLengthProvider exists because we do not want to reference WALFactory and WALProvider + * directly in the replication code so in the future it will be easier to decouple them. + * Each walProvider will have its own implementation. */ @InterfaceAudience.Private @FunctionalInterface diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java index fbd8d30bba6..5583a477a55 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -22,6 +22,8 @@ import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.ReplicationPeerNotFoundException; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionReplicaUtil; @@ -57,7 +59,15 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil { public static final String REGION_REPLICA_REPLICATION_CONF_KEY = "hbase.region.replica.replication.enabled"; private static final boolean DEFAULT_REGION_REPLICA_REPLICATION = false; - private static final String REGION_REPLICA_REPLICATION_PEER = "region_replica_replication"; + public static final String REGION_REPLICA_REPLICATION_PEER = "region_replica_replication"; + + /** + * Same as for {@link #REGION_REPLICA_REPLICATION_CONF_KEY} but for catalog replication. + */ + public static final String REGION_REPLICA_REPLICATION_CATALOG_CONF_KEY + = "hbase.region.replica.replication.catalog.enabled"; + private static final boolean DEFAULT_REGION_REPLICA_REPLICATION_CATALOG = false; + /** * Enables or disables refreshing store files of secondary region replicas when the memory is @@ -116,7 +126,6 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil { * files of the primary region, so an HFileLink is used to construct the StoreFileInfo. This * way ensures that the secondary will be able to continue reading the store files even if * they are moved to archive after compaction - * @throws IOException */ public static StoreFileInfo getStoreFileInfo(Configuration conf, FileSystem fs, RegionInfo regionInfo, RegionInfo regionInfoForFs, String familyName, Path path) @@ -153,8 +162,7 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil { } /** - * Create replication peer for replicating to region replicas if needed. - *

+ * Create replication peer for replicating user-space Region Read Replicas. * This methods should only be called at master side. */ public static void setupRegionReplicaReplication(MasterServices services) @@ -174,16 +182,42 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil { services.addReplicationPeer(REGION_REPLICA_REPLICATION_PEER, peerConfig, true); } - public static boolean isRegionReplicaReplicationEnabled(Configuration conf) { - return conf.getBoolean(REGION_REPLICA_REPLICATION_CONF_KEY, - DEFAULT_REGION_REPLICA_REPLICATION); + /** + * @return True if Region Read Replica is enabled for tn (whether hbase:meta or + * user-space tables). + */ + public static boolean isRegionReplicaReplicationEnabled(Configuration conf, TableName tn) { + return isMetaRegionReplicaReplicationEnabled(conf, tn) || + isRegionReplicaReplicationEnabled(conf); } + /** + * @return True if Region Read Replica is enabled for user-space tables. + */ + private static boolean isRegionReplicaReplicationEnabled(Configuration conf) { + return conf.getBoolean(REGION_REPLICA_REPLICATION_CONF_KEY, DEFAULT_REGION_REPLICA_REPLICATION); + } + + /** + * @return True if hbase:meta Region Read Replica is enabled. + */ + public static boolean isMetaRegionReplicaReplicationEnabled(Configuration conf, TableName tn) { + return TableName.isMetaTableName(tn) && + conf.getBoolean(REGION_REPLICA_REPLICATION_CATALOG_CONF_KEY, + DEFAULT_REGION_REPLICA_REPLICATION_CATALOG); + } + + /** + * @return True if wait for primary to flush is enabled for user-space tables. + */ public static boolean isRegionReplicaWaitForPrimaryFlushEnabled(Configuration conf) { return conf.getBoolean(REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY, DEFAULT_REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH); } + /** + * @return True if we are to refresh user-space hfiles in Region Read Replicas. + */ public static boolean isRegionReplicaStoreFileRefreshEnabled(Configuration conf) { return conf.getBoolean(REGION_REPLICA_STORE_FILE_REFRESH, DEFAULT_REGION_REPLICA_STORE_FILE_REFRESH); @@ -194,11 +228,4 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil { DEFAULT_REGION_REPLICA_STORE_FILE_REFRESH_MEMSTORE_MULTIPLIER); } - /** - * Return the peer id used for replicating to secondary region replicas - */ - public static String getReplicationPeerId() { - return REGION_REPLICA_REPLICATION_PEER; - } - } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java index 3b7f31106d6..7b8b3a01e32 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -67,7 +67,7 @@ public class WALFactory { /** * Maps between configuration names for providers and implementation classes. */ - static enum Providers { + enum Providers { defaultProvider(AsyncFSWALProvider.class), filesystem(FSHLogProvider.class), multiwal(RegionGroupingProvider.class), @@ -256,8 +256,12 @@ public class WALFactory { return provider.getWALs(); } - @VisibleForTesting - WALProvider getMetaProvider() throws IOException { + /** + * Called when we lazily create a hbase:meta WAL OR from ReplicationSourceManager ahead of + * creating the first hbase:meta WAL so we can register a listener. + * @see #getMetaWALProvider() + */ + public WALProvider getMetaProvider() throws IOException { for (;;) { WALProvider provider = this.metaProvider.get(); if (provider != null) { @@ -306,7 +310,6 @@ public class WALFactory { * to reopen it multiple times, use {@link WAL.Reader#reset()} instead of this method * then just seek back to the last known good position. * @return A WAL reader. Close when done with it. - * @throws IOException */ public Reader createReader(final FileSystem fs, final Path path, CancelableProgressable reporter) throws IOException { @@ -490,6 +493,10 @@ public class WALFactory { return this.provider; } + /** + * @return Current metaProvider... may be null if not yet initialized. + * @see #getMetaProvider() + */ public final WALProvider getMetaWALProvider() { return this.metaProvider.get(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java index 305a818c918..884dad9ebc8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -70,8 +70,9 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { } @Override - public void startup() { + public ReplicationSourceInterface startup() { startup.set(true); + return this; } public boolean isStartup() { @@ -160,4 +161,9 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { public ServerName getServerWALsBelongTo() { return null; } + + @Override + public ReplicationQueueStorage getReplicationQueueStorage() { + return null; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java new file mode 100644 index 00000000000..34432e43a71 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tests RegionReplicaReplicationEndpoint class for hbase:meta by setting up region replicas and + * verifying async wal replication replays the edits to the secondary region in various scenarios. + * @see TestRegionReplicaReplicationEndpoint + */ +@Category({LargeTests.class}) +public class TestMetaRegionReplicaReplicationEndpoint { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestMetaRegionReplicaReplicationEndpoint.class); + private static final Logger LOG = + LoggerFactory.getLogger(TestMetaRegionReplicaReplicationEndpoint.class); + private static final int NB_SERVERS = 3; + private static final HBaseTestingUtility HTU = new HBaseTestingUtility(); + + @Rule + public TestName name = new TestName(); + + @Before + public void before() throws Exception { + Configuration conf = HTU.getConfiguration(); + conf.setFloat("hbase.regionserver.logroll.multiplier", 0.0003f); + conf.setInt("replication.source.size.capacity", 10240); + conf.setLong("replication.source.sleepforretries", 100); + conf.setInt("hbase.regionserver.maxlogs", 10); + conf.setLong("hbase.master.logcleaner.ttl", 10); + conf.setInt("zookeeper.recovery.retry", 1); + conf.setInt("zookeeper.recovery.retry.intervalmill", 10); + conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); + conf.setInt("replication.stats.thread.period.seconds", 5); + conf.setBoolean("hbase.tests.use.shortcircuit.reads", false); + conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1); + // Enable hbase:meta replication. + conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CATALOG_CONF_KEY, true); + // Set hbase:meta replicas to be 3. + conf.setInt(HConstants.META_REPLICAS_NUM, NB_SERVERS); + HTU.startMiniCluster(NB_SERVERS); + HTU.waitFor(30000, + () -> HTU.getMiniHBaseCluster().getRegions(TableName.META_TABLE_NAME).size() >= NB_SERVERS); + } + + @After + public void after() throws Exception { + HTU.shutdownMiniCluster(); + } + + /** + * Assert that the ReplicationSource for hbase:meta gets created when hbase:meta is opened. + */ + @Test + public void testHBaseMetaReplicationSourceCreatedOnOpen() + throws IOException, InterruptedException { + MiniHBaseCluster cluster = HTU.getMiniHBaseCluster(); + HRegionServer hrs = cluster.getRegionServer(cluster.getServerHoldingMeta()); + assertTrue(isMetaRegionReplicaReplicationSource(hrs)); + // Now move the hbase:meta and make sure the ReplicationSoruce is in both places. + HRegionServer hrsOther = null; + for (int i = 0; i < cluster.getNumLiveRegionServers(); i++) { + hrsOther = cluster.getRegionServer(i); + if (hrsOther.getServerName().equals(hrs.getServerName())) { + hrsOther = null; + continue; + } + break; + } + assertNotNull(hrsOther); + assertFalse(isMetaRegionReplicaReplicationSource(hrsOther)); + Region meta = null; + for (Region region: hrs.getOnlineRegionsLocalContext()) { + if (region.getRegionInfo().isMetaRegion()) { + meta = region; + break; + } + } + assertNotNull(meta); + HTU.moveRegionAndWait(meta.getRegionInfo(), hrsOther.getServerName()); + // Assert that there is a ReplicationSource in both places now. + assertTrue(isMetaRegionReplicaReplicationSource(hrs)); + assertTrue(isMetaRegionReplicaReplicationSource(hrsOther)); + } + + /** + * @return Whether the special meta region replica peer is enabled on hrs + */ + private boolean isMetaRegionReplicaReplicationSource(HRegionServer hrs) { + return hrs.getReplicationSourceService().getReplicationManager(). + catalogReplicationSource.get() != null; + } + + /** + * Test meta region replica replication. Create some tables and see if replicas pick up the + * additions. + */ + @Test + public void testHBaseMetaReplicates() throws Exception { + try (Table table = HTU.createTable(TableName.valueOf(this.name.getMethodName() + "_0"), + HConstants.CATALOG_FAMILY, + Arrays.copyOfRange(HBaseTestingUtility.KEYS, 1, HBaseTestingUtility.KEYS.length))) { + verifyReplication(TableName.META_TABLE_NAME, NB_SERVERS, getMetaCells(table.getName())); + } + try (Table table = HTU.createTable(TableName.valueOf(this.name.getMethodName() + "_1"), + HConstants.CATALOG_FAMILY, + Arrays.copyOfRange(HBaseTestingUtility.KEYS, 1, HBaseTestingUtility.KEYS.length))) { + verifyReplication(TableName.META_TABLE_NAME, NB_SERVERS, getMetaCells(table.getName())); + // Try delete. + HTU.deleteTableIfAny(table.getName()); + verifyDeletedReplication(TableName.META_TABLE_NAME, NB_SERVERS, table.getName()); + } + } + + /** + * Replicas come online after primary. + */ + private void waitForMetaReplicasToOnline() throws IOException { + final RegionLocator regionLocator = + HTU.getConnection().getRegionLocator(TableName.META_TABLE_NAME); + HTU.waitFor(10000, + // getRegionLocations returns an entry for each replica but if unassigned, entry is null. + // Pass reload to force us to skip cache else it just keeps returning default. + () -> regionLocator.getRegionLocations(HConstants.EMPTY_START_ROW, true).stream(). + filter(Objects::nonNull).count() >= NB_SERVERS); + List locations = regionLocator.getRegionLocations(HConstants.EMPTY_START_ROW); + LOG.info("Found locations {}", locations); + assertEquals(NB_SERVERS, locations.size()); + } + + /** + * Scan hbase:meta for tableName content. + */ + private List getMetaCells(TableName tableName) throws IOException { + final List results = new ArrayList<>(); + MetaTableAccessor.Visitor visitor = new MetaTableAccessor.Visitor() { + @Override public boolean visit(Result r) throws IOException { + results.add(r); + return true; + } + }; + MetaTableAccessor.scanMetaForTableRegions(HTU.getConnection(), visitor, tableName); + return results; + } + + /** + * @return All Regions for tableName including Replicas. + */ + private Region [] getAllRegions(TableName tableName, int replication) { + final Region[] regions = new Region[replication]; + for (int i = 0; i < NB_SERVERS; i++) { + HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i); + List onlineRegions = rs.getRegions(tableName); + for (HRegion region : onlineRegions) { + regions[region.getRegionInfo().getReplicaId()] = region; + } + } + for (Region region : regions) { + assertNotNull(region); + } + return regions; + } + + /** + * Verify when a Table is deleted from primary, then there are no references in replicas + * (because they get the delete of the table rows too). + */ + private void verifyDeletedReplication(TableName tableName, int regionReplication, + final TableName deletedTableName) { + final Region[] regions = getAllRegions(tableName, regionReplication); + + // Start count at '1' so we skip default, primary replica and only look at secondaries. + for (int i = 1; i < regionReplication; i++) { + final Region region = regions[i]; + // wait until all the data is replicated to all secondary regions + Waiter.waitFor(HTU.getConfiguration(), 30000, 1000, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + LOG.info("Verifying replication for region replica {}", region.getRegionInfo()); + try (RegionScanner rs = region.getScanner(new Scan())) { + List cells = new ArrayList<>(); + while (rs.next(cells)) { + continue; + } + return doesNotContain(cells, deletedTableName); + } catch(Throwable ex) { + LOG.warn("Verification from secondary region is not complete yet", ex); + // still wait + return false; + } + } + }); + } + } + + /** + * Cells are from hbase:meta replica so will start w/ 'tableName,'; i.e. the tablename followed + * by HConstants.DELIMITER. Make sure the deleted table is no longer present in passed + * cells. + */ + private boolean doesNotContain(List cells, TableName tableName) { + for (Cell cell: cells) { + String row = Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); + if (row.startsWith(tableName.toString() + HConstants.DELIMITER)) { + return false; + } + } + return true; + } + + /** + * Verify Replicas have results (exactly). + */ + private void verifyReplication(TableName tableName, int regionReplication, + List contains) { + final Region[] regions = getAllRegions(tableName, regionReplication); + + // Start count at '1' so we skip default, primary replica and only look at secondaries. + for (int i = 1; i < regionReplication; i++) { + final Region region = regions[i]; + // wait until all the data is replicated to all secondary regions + Waiter.waitFor(HTU.getConfiguration(), 30000, 1000, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + LOG.info("Verifying replication for region replica {}", region.getRegionInfo()); + try (RegionScanner rs = region.getScanner(new Scan())) { + List cells = new ArrayList<>(); + while (rs.next(cells)) { + continue; + } + return contains(contains, cells); + } catch(Throwable ex) { + LOG.warn("Verification from secondary region is not complete yet", ex); + // still wait + return false; + } + } + }); + } + } + + /** + * Presumes sorted Cells. Verify that cells has contains at least. + */ + static boolean contains(List contains, List cells) throws IOException { + CellScanner containsScanner = CellUtil.createCellScanner(contains); + CellScanner cellsScanner = CellUtil.createCellScanner(cells); + int matches = 0; + int count = 0; + while (containsScanner.advance()) { + while (cellsScanner.advance()) { + count++; + LOG.info("{} {}", containsScanner.current(), cellsScanner.current()); + if (containsScanner.current().equals(cellsScanner.current())) { + matches++; + break; + } + } + } + return !containsScanner.advance() && matches >= 1 && count >= matches && count == cells.size(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java index c86f2f4849b..54a8dcc73f0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,12 +20,14 @@ package org.apache.hadoop.hbase.replication.regionserver; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.io.IOException; import java.util.List; +import java.util.UUID; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; @@ -42,6 +44,7 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.ReplicationPeerNotFoundException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; @@ -51,6 +54,7 @@ import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.testclassification.FlakeyTests; @@ -69,6 +73,7 @@ import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; +import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -124,91 +129,97 @@ public class TestRegionReplicaReplicationEndpoint { public void testRegionReplicaReplicationPeerIsCreated() throws IOException, ReplicationException { // create a table with region replicas. Check whether the replication peer is created // and replication started. - ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration()); - String peerId = "region_replica_replication"; + try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); + Admin admin = connection.getAdmin()) { + String peerId = ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER; - ReplicationPeerConfig peerConfig = null; - try { - peerConfig = admin.getPeerConfig(peerId); - } catch (ReplicationPeerNotFoundException e) { - LOG.warn("Region replica replication peer id=" + peerId + " not exist", e); - } + ReplicationPeerConfig peerConfig = null; + try { + peerConfig = admin.getReplicationPeerConfig(peerId); + } catch (ReplicationPeerNotFoundException e) { + LOG.warn("Region replica replication peer id=" + peerId + " not exist", e); + } - if (peerConfig != null) { - admin.removePeer(peerId); - peerConfig = null; - } + try { + peerConfig = admin.getReplicationPeerConfig(peerId); + } catch (ReplicationPeerNotFoundException e) { + LOG.warn("Region replica replication peer id=" + peerId + " not exist", e); + } - HTableDescriptor htd = HTU.createTableDescriptor( - "testReplicationPeerIsCreated_no_region_replicas"); - HTU.getAdmin().createTable(htd); - try { - peerConfig = admin.getPeerConfig(peerId); - fail("Should throw ReplicationException, because replication peer id=" + peerId - + " not exist"); - } catch (ReplicationPeerNotFoundException e) { - } - assertNull(peerConfig); + if (peerConfig != null) { + admin.removeReplicationPeer(peerId); + peerConfig = null; + } - htd = HTU.createTableDescriptor("testReplicationPeerIsCreated"); - htd.setRegionReplication(2); - HTU.getAdmin().createTable(htd); + HTableDescriptor htd = HTU.createTableDescriptor( + "testReplicationPeerIsCreated_no_region_replicas"); + HTU.getAdmin().createTable(htd); + try { + peerConfig = admin.getReplicationPeerConfig(peerId); + fail("Should throw ReplicationException, because replication peer id=" + peerId + + " not exist"); + } catch (ReplicationPeerNotFoundException e) { + } + assertNull(peerConfig); - // assert peer configuration is correct - peerConfig = admin.getPeerConfig(peerId); - assertNotNull(peerConfig); - assertEquals(peerConfig.getClusterKey(), ZKConfig.getZooKeeperClusterKey( - HTU.getConfiguration())); - assertEquals(RegionReplicaReplicationEndpoint.class.getName(), - peerConfig.getReplicationEndpointImpl()); - admin.close(); + htd = HTU.createTableDescriptor("testReplicationPeerIsCreated"); + htd.setRegionReplication(2); + HTU.getAdmin().createTable(htd); + + // assert peer configuration is correct + peerConfig = admin.getReplicationPeerConfig(peerId); + assertNotNull(peerConfig); + assertEquals(peerConfig.getClusterKey(), ZKConfig.getZooKeeperClusterKey( + HTU.getConfiguration())); + assertEquals(RegionReplicaReplicationEndpoint.class.getName(), + peerConfig.getReplicationEndpointImpl()); + } } @Test public void testRegionReplicaReplicationPeerIsCreatedForModifyTable() throws Exception { // modify a table by adding region replicas. Check whether the replication peer is created // and replication started. - ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration()); - String peerId = "region_replica_replication"; + try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); + Admin admin = connection.getAdmin()) { + String peerId = ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER; + ReplicationPeerConfig peerConfig = null; + try { + peerConfig = admin.getReplicationPeerConfig(peerId); + } catch (ReplicationPeerNotFoundException e) { + LOG.warn("Region replica replication peer id=" + peerId + " not exist", e); + } - ReplicationPeerConfig peerConfig = null; - try { - peerConfig = admin.getPeerConfig(peerId); - } catch (ReplicationPeerNotFoundException e) { - LOG.warn("Region replica replication peer id=" + peerId + " not exist", e); - } + if (peerConfig != null) { + admin.removeReplicationPeer(peerId); + peerConfig = null; + } - if (peerConfig != null) { - admin.removePeer(peerId); - peerConfig = null; - } + HTableDescriptor htd = HTU.createTableDescriptor("testRegionReplicaReplicationPeerIsCreatedForModifyTable"); + HTU.getAdmin().createTable(htd); - HTableDescriptor htd - = HTU.createTableDescriptor("testRegionReplicaReplicationPeerIsCreatedForModifyTable"); - HTU.getAdmin().createTable(htd); - - // assert that replication peer is not created yet - try { - peerConfig = admin.getPeerConfig(peerId); - fail("Should throw ReplicationException, because replication peer id=" + peerId + // assert that replication peer is not created yet + try { + peerConfig = admin.getReplicationPeerConfig(peerId); + fail("Should throw ReplicationException, because replication peer id=" + peerId + " not exist"); - } catch (ReplicationPeerNotFoundException e) { - } - assertNull(peerConfig); + } catch (ReplicationPeerNotFoundException e) { + } + assertNull(peerConfig); - HTU.getAdmin().disableTable(htd.getTableName()); - htd.setRegionReplication(2); - HTU.getAdmin().modifyTable(htd.getTableName(), htd); - HTU.getAdmin().enableTable(htd.getTableName()); + HTU.getAdmin().disableTable(htd.getTableName()); + htd.setRegionReplication(2); + HTU.getAdmin().modifyTable(htd.getTableName(), htd); + HTU.getAdmin().enableTable(htd.getTableName()); - // assert peer configuration is correct - peerConfig = admin.getPeerConfig(peerId); - assertNotNull(peerConfig); - assertEquals(peerConfig.getClusterKey(), ZKConfig.getZooKeeperClusterKey( - HTU.getConfiguration())); - assertEquals(RegionReplicaReplicationEndpoint.class.getName(), + // assert peer configuration is correct + peerConfig = admin.getReplicationPeerConfig(peerId); + assertNotNull(peerConfig); + assertEquals(peerConfig.getClusterKey(), ZKConfig.getZooKeeperClusterKey(HTU.getConfiguration())); + assertEquals(RegionReplicaReplicationEndpoint.class.getName(), peerConfig.getReplicationEndpointImpl()); - admin.close(); + admin.close(); + } } public void testRegionReplicaReplication(int regionReplication) throws Exception { @@ -405,8 +416,7 @@ public class TestRegionReplicaReplicationEndpoint { HTU.getAdmin().createTable(htd); // both tables are created, now pause replication - ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration()); - admin.disablePeer(ServerRegionReplicaUtil.getReplicationPeerId()); + HTU.getAdmin().disableReplicationPeer(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER); // now that the replication is disabled, write to the table to be dropped, then drop the table. @@ -450,6 +460,27 @@ public class TestRegionReplicaReplicationEndpoint { assertEquals(2, skippedEdits.get()); + HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(0); + MetricsSource metrics = mock(MetricsSource.class); + ReplicationEndpoint.Context ctx = + new ReplicationEndpoint.Context(HTU.getConfiguration(), HTU.getConfiguration(), + HTU.getTestFileSystem(), ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER, + UUID.fromString(rs.getClusterId()), rs.getReplicationSourceService(). + getReplicationManager().getReplicationPeers() + .getPeer(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER), + metrics, rs.getTableDescriptors(), rs); + RegionReplicaReplicationEndpoint rrpe = new RegionReplicaReplicationEndpoint(); + rrpe.init(ctx); + rrpe.start(); + ReplicationEndpoint.ReplicateContext repCtx = new ReplicationEndpoint.ReplicateContext(); + repCtx.setEntries(Lists.newArrayList(entry, entry)); + assertTrue(rrpe.replicate(repCtx)); + /* Come back here. There is a difference on how counting is done here and in master branch. + St.Ack + Mockito.verify(metrics, Mockito.times(1)). + incrLogEditsFiltered(Mockito.eq(2L)); + */ + rrpe.stop(); if (disableReplication) { // enable replication again so that we can verify replication HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table @@ -465,12 +496,11 @@ public class TestRegionReplicaReplicationEndpoint { HTU.loadNumericRows(table, HBaseTestingUtility.fam1, 0, 1000); // now enable the replication - admin.enablePeer(ServerRegionReplicaUtil.getReplicationPeerId()); + HTU.getAdmin().enableReplicationPeer(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER); verifyReplication(tableName, regionReplication, 0, 1000); } finally { - admin.close(); table.close(); rl.close(); tableToBeDisabled.close(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 5f7a1c299ed..2745ad668ee 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -320,7 +320,7 @@ public abstract class TestReplicationSourceManager { wal.rollWriter(); - manager.logPositionAndCleanOldLogs("1", false, + manager.logPositionAndCleanOldLogs(manager.getSources().get(0), new WALEntryBatch(0, manager.getSources().get(0).getCurrentPath())); wal.appendData(hri, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java index 29749bdc67b..9e513cd7ec2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java @@ -109,7 +109,7 @@ public class TestSerialReplicationChecker { public void setUp() throws IOException { ReplicationSource source = mock(ReplicationSource.class); when(source.getPeerId()).thenReturn(PEER_ID); - when(source.getQueueStorage()).thenReturn(QUEUE_STORAGE); + when(source.getReplicationQueueStorage()).thenReturn(QUEUE_STORAGE); conn = mock(Connection.class); when(conn.isClosed()).thenReturn(false); doAnswer(new Answer() {