HBASE-25055 Add ReplicationSource for meta WALs; add enable/disable w… (#2451)

* HBASE-25055 Add ReplicationSource for meta WALs; add enable/disable when hbase:meta assigned to RS

Fill in gap left by HBASE-11183 'Timeline Consistent region replicas - Phase 2 design'.
HBASE-11183 left off implementing 'async WAL Replication' on the hbase:meta
Table; hbase:meta Table could only do Phase 1 Region Replicas reading
the primary Regions' hfiles. Here we add 'async WAL Replication' to
hbase:meta so Replicas can be more current with the primary's changes.

Adds a 'special' ReplicationSource that reads hbase:meta WAL files and replicates
all edits to the configured in-cluster endpoint (Defaults to the
RegionReadReplicaEndpoint.class -- set hbase.region.replica.catalog.replication to
target a different endpoint implementation).

Set hbase.region.replica.replication.catalog.enabled to enable async WAL
Replication for hbase:meta region replicas. Its off by default.

The CatalogReplicationSource for async WAL Replication of hbase:meta does
NOT need to keep up WAL offset or a queue of WALs-to-replicate in the
replication queue store as is done in other ReplicationSource implementations;
the CatalogReplicationSource is for Region Replicas only. General
Replication does not replicate hbase:meta. hbase:meta Region Replicas reset
on crash of the primary replica so there is no need to 'recover'
replication that was running on the crashed server.

Because it so different in operation, the CatalogReplicationSource is bolted
on to the side of the ReplicationSourceManager. It is lazily
instantiated to match the lazy instantiation of the hbase:meta
WALProvider, created and started on the open of the first Region of an
hbase:meta table. Thereafter it stays up till the process dies, even if
all hbase:meta Regions have moved off the server, in case a hbase:meta
Region is moved back (Doing this latter simplifies the implementation)

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
  Read configuration to see if we need to wait on setting a Region read-enabled
  (if so, replicas will only flip to enable read after confirming a
  flush of the primary so they for sure are a replica of a known point)

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/AssignRegionHandler.java
 If configured, on open of hbase:meta, ask the ReplicationSourceManager
 to add a ReplicationSource (if it hasn't already).

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java
 Edit log message.

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java
 If configured, on close of hbase:meta, update ReplicationSourceManager
 that a source Region has closed.

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
 javadoc and make constructor private.

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
 Add logPositionAndCleanOldLogs w/ default of the old behavior so
 CatalogReplicationSource can bypass updating store with WAL position,
 etc.

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
 Add creation and start of an CatalogReplicationSource.

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
 Go via ReplicationSource when calling logPostionAndCleanOldLogs so new RS can intercept.

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java
 Javadoc.

hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
 Add utility for reading configurations for hbase:meta region replicas.

hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
 Javadoc.

hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java
 Use define.

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicationSource.java
 Specical version of ReplicationSource for Region Replicas on hbase:meta.

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicationSourcePeer.java
 Needs a special peer too (peers are baked into replication though we don't use 'peers' here)

hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java
hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALProvider.java
 Tests.

Signed-off-by: Duo Zhang <zhangduo@apache.org>
Signed-off-by: Huaxiang Sun <huaxiangsun@apache.com>
This commit is contained in:
Michael Stack 2020-10-02 12:29:18 -07:00 committed by stack
parent 2b1e8b306f
commit de1843ac94
23 changed files with 893 additions and 160 deletions

View File

@ -2473,9 +2473,9 @@ public class HRegionServer extends Thread implements
if (ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
return;
}
if (!ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(region.conf) ||
!ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(
region.conf)) {
TableName tn = region.getTableDescriptor().getTableName();
if (!ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(region.conf, tn) ||
!ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(region.conf)) {
region.setReadsEnabled(true);
return;
}

View File

@ -20,8 +20,11 @@ package org.apache.hadoop.hbase.regionserver.handler;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.EventType;
@ -31,6 +34,7 @@ import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionServerServices.PostOpenDeployContext;
import org.apache.hadoop.hbase.regionserver.RegionServerServices.RegionStateTransitionContext;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -129,8 +133,15 @@ public class AssignRegionHandler extends EventHandler {
}
// pass null for the last parameter, which used to be a CancelableProgressable, as now the
// opening can not be interrupted by a close request any more.
region = HRegion.openHRegion(regionInfo, htd, rs.getWAL(regionInfo), rs.getConfiguration(),
rs, null);
Configuration conf = rs.getConfiguration();
TableName tn = htd.getTableName();
if (ServerRegionReplicaUtil.isMetaRegionReplicaReplicationEnabled(conf, tn)) {
if (RegionReplicaUtil.isDefaultReplica(this.regionInfo.getReplicaId())) {
// Add the hbase:meta replication source on replica zero/default.
rs.getReplicationSourceService().getReplicationManager().addCatalogReplicationSource();
}
}
region = HRegion.openHRegion(regionInfo, htd, rs.getWAL(regionInfo), conf, rs, null);
} catch (IOException e) {
cleanUpAndReportFailure(e);
return;

View File

@ -109,9 +109,9 @@ public class RegionReplicaFlushHandler extends EventHandler {
RetryCounter counter = new RetryCounterFactory(maxAttempts, (int)pause).create();
if (LOG.isDebugEnabled()) {
LOG.debug("RPC'ing to primary region replica " +
ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo()) + " from " +
region.getRegionInfo() + " to trigger FLUSH");
LOG.debug("RPC'ing to primary " + ServerRegionReplicaUtil.
getRegionInfoForDefaultReplica(region.getRegionInfo()).getRegionNameAsString() +
" from " + region.getRegionInfo().getRegionNameAsString() + " to trigger FLUSH");
}
while (!region.isClosing() && !region.isClosed()
&& !server.isAborted() && !server.isStopped()) {

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.regionserver.HRegion;
@ -30,6 +31,7 @@ import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionServerServices.RegionStateTransitionContext;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -113,12 +115,20 @@ public class UnassignRegionHandler extends EventHandler {
if (region.close(abort) == null) {
// XXX: Is this still possible? The old comment says about split, but now split is done at
// master side, so...
LOG.warn("Can't close {} already closed during close()", regionName);
LOG.warn("Can't close {}, already closed during close()", regionName);
rs.getRegionsInTransitionInRS().remove(encodedNameBytes, Boolean.FALSE);
return;
}
rs.removeRegion(region, destination);
if (ServerRegionReplicaUtil.isMetaRegionReplicaReplicationEnabled(rs.getConfiguration(),
region.getTableDescriptor().getTableName())) {
if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo().getReplicaId())) {
// If hbase:meta read replicas enabled, remove replication source for hbase:meta Regions.
// See assign region handler where we add the replication source on open.
rs.getReplicationSourceService().getReplicationManager().removeCatalogReplicationSource();
}
}
if (!rs.reportRegionStateTransition(
new RegionStateTransitionContext(TransitionCode.CLOSED, HConstants.NO_SEQNUM, closeProcId,
-1, region.getRegionInfo()))) {

View File

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import java.util.Collections;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.yetus.audience.InterfaceAudience;
/**
* ReplicationSource that reads catalog WAL files -- e.g. hbase:meta WAL files -- and lets through
* all WALEdits from these WALs. This ReplicationSource is NOT created via
* {@link ReplicationSourceFactory}.
*/
@InterfaceAudience.Private
class CatalogReplicationSource extends ReplicationSource {
CatalogReplicationSource() {
// Filters in hbase:meta WAL files and allows all edits, including 'meta' edits (these are
// filtered out in the 'super' class default implementation).
super(p -> AbstractFSWALProvider.isMetaFile(p), Collections.emptyList());
}
@Override
public void logPositionAndCleanOldLogs(WALEntryBatch entryBatch) {
// Noop. This implementation does not persist state to backing storage nor does it keep its
// WALs in a general map up in ReplicationSourceManager so just skip calling through to the
// default implemenentation.
}
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.yetus.audience.InterfaceAudience;
/**
* The 'peer' used internally by Catalog Region Replicas Replication Source.
* The Replication system has 'peer' baked into its core so though we do not need 'peering', we
* need a 'peer' and its configuration else the replication system breaks at a few locales.
* Set "hbase.region.replica.catalog.replication" if you want to change the configured endpoint.
*/
@InterfaceAudience.Private
class CatalogReplicationSourcePeer extends ReplicationPeerImpl {
/**
* @param clusterKey Usually the UUID from zk passed in by caller as a String.
*/
CatalogReplicationSourcePeer(Configuration configuration, String clusterKey, String peerId) {
super(configuration, ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER + "_catalog",
true,
ReplicationPeerConfig.newBuilder().
setClusterKey(clusterKey).
setReplicationEndpointImpl(
configuration.get("hbase.region.replica.catalog.replication",
RegionReplicaReplicationEndpoint.class.getName())).
setBandwidth(0). // '0' means no bandwidth.
setSerial(false).
build());
}
}

View File

@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Noop queue storage -- does nothing.
*/
@InterfaceAudience.Private
class NoopReplicationQueueStorage implements ReplicationQueueStorage {
NoopReplicationQueueStorage() {}
@Override
public void removeQueue(ServerName serverName, String queueId) throws ReplicationException {}
@Override
public void addWAL(ServerName serverName, String queueId, String fileName)
throws ReplicationException {}
@Override
public void removeWAL(ServerName serverName, String queueId, String fileName)
throws ReplicationException { }
@Override
public void setWALPosition(ServerName serverName, String queueId, String fileName, long position,
Map<String, Long> lastSeqIds) throws ReplicationException {}
@Override
public long getLastSequenceId(String encodedRegionName, String peerId)
throws ReplicationException {
return 0;
}
@Override
public void setLastSequenceIds(String peerId, Map<String, Long> lastSeqIds)
throws ReplicationException {}
@Override
public void removeLastSequenceIds(String peerId) throws ReplicationException {}
@Override
public void removeLastSequenceIds(String peerId, List<String> encodedRegionNames)
throws ReplicationException {}
@Override
public long getWALPosition(ServerName serverName, String queueId, String fileName)
throws ReplicationException {
return 0;
}
@Override
public List<String> getWALsInQueue(ServerName serverName, String queueId)
throws ReplicationException {
return Collections.EMPTY_LIST;
}
@Override
public List<String> getAllQueues(ServerName serverName) throws ReplicationException {
return Collections.EMPTY_LIST;
}
@Override
public Pair<String, SortedSet<String>> claimQueue(ServerName sourceServerName, String queueId,
ServerName destServerName) throws ReplicationException {
return null;
}
@Override
public void removeReplicatorIfQueueIsEmpty(ServerName serverName)
throws ReplicationException {}
@Override
public List<ServerName> getListOfReplicators() throws ReplicationException {
return Collections.EMPTY_LIST;
}
@Override
public Set<String> getAllWALs() throws ReplicationException {
return Collections.EMPTY_SET;
}
@Override
public void addPeerToHFileRefs(String peerId) throws ReplicationException {}
@Override
public void removePeerFromHFileRefs(String peerId) throws ReplicationException {}
@Override
public void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs)
throws ReplicationException {}
@Override
public void removeHFileRefs(String peerId, List<String> files) throws ReplicationException {}
@Override
public List<String> getAllPeersFromHFileRefsQueue() throws ReplicationException {
return Collections.EMPTY_LIST;
}
@Override
public List<String> getReplicableHFiles(String peerId) throws ReplicationException {
return Collections.EMPTY_LIST;
}
@Override
public Set<String> getAllHFileRefs() throws ReplicationException {
return Collections.EMPTY_SET;
}
@Override
public String getRsNode(ServerName serverName) {
return null;
}
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.OptionalLong;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@ -124,11 +123,11 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
}
this.globalMetricsSource = CompatibilitySingletonFactory
.getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
WALProvider walProvider = walFactory.getWALProvider();
this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers,
replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId,
walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty(),
globalMetricsSource);
replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId, walFactory,
globalMetricsSource);
// Get the user-space WAL provider
WALProvider walProvider = walFactory != null? walFactory.getWALProvider(): null;
if (walProvider != null) {
walProvider
.addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager));

View File

@ -65,10 +65,10 @@ import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
/**
@ -227,6 +227,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
this.peerId = this.replicationQueueInfo.getPeerId();
this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
// A defaultBandwidth of '0' means no bandwidth; i.e. no throttling.
defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
currentBandwidth = getCurrentBandwidth();
this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
@ -378,16 +379,10 @@ public class ReplicationSource implements ReplicationSourceInterface {
private void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) {
workerThreads.compute(walGroupId, (key, value) -> {
if (value != null) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"{} Someone has beat us to start a worker thread for wal group {}",
logPeerId(), key);
}
LOG.debug("{} preempted start of shipping worker walGroupId={}", logPeerId(), walGroupId);
return value;
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("{} Starting up worker for wal group {}", logPeerId(), key);
}
LOG.debug("{} starting shipping worker for walGroupId={}", logPeerId(), walGroupId);
ReplicationSourceShipper worker = createNewShipper(walGroupId, queue);
ReplicationSourceWALReader walReader =
createNewWALReader(walGroupId, queue, worker.getStartPosition());
@ -532,7 +527,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
private long getCurrentBandwidth() {
long peerBandwidth = replicationPeer.getPeerBandwidth();
// user can set peer bandwidth to 0 to use default bandwidth
// User can set peer bandwidth to 0 to use default bandwidth.
return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth;
}
@ -627,11 +622,11 @@ public class ReplicationSource implements ReplicationSourceInterface {
this.startupOngoing.set(false);
throw new IllegalStateException("Source should be active.");
}
LOG.info("{} queueId={} is replicating from cluster={} to cluster={}",
logPeerId(), this.replicationQueueInfo.getQueueId(), clusterId, peerClusterId);
LOG.info("{} queueId={} (queues={}) is replicating from cluster={} to cluster={}",
logPeerId(), this.replicationQueueInfo.getQueueId(), this.queues.size(), clusterId,
peerClusterId);
initializeWALEntryFilter(peerClusterId);
// start workers
// Start workers
for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) {
String walGroupId = entry.getKey();
PriorityBlockingQueue<Path> queue = entry.getValue();
@ -641,11 +636,10 @@ public class ReplicationSource implements ReplicationSourceInterface {
}
@Override
public void startup() {
public ReplicationSourceInterface startup() {
if (this.sourceRunning) {
return;
return this;
}
// Mark we are running now
this.sourceRunning = true;
startupOngoing.set(true);
initThread = new Thread(this::initialize);
@ -673,6 +667,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
}
} while ((this.startupOngoing.get() || this.retryStartup.get()) && !this.abortOnError);
});
return this;
}
@Override
@ -851,7 +846,8 @@ public class ReplicationSource implements ReplicationSourceInterface {
return server;
}
ReplicationQueueStorage getQueueStorage() {
@Override
public ReplicationQueueStorage getReplicationQueueStorage() {
return queueStorage;
}

View File

@ -19,19 +19,22 @@
package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
/**
* Constructs a {@link ReplicationSourceInterface}
* Note, not used to create specialized ReplicationSources
* @see CatalogReplicationSource
*/
@InterfaceAudience.Private
public class ReplicationSourceFactory {
public final class ReplicationSourceFactory {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceFactory.class);
private ReplicationSourceFactory() {}
static ReplicationSourceInterface create(Configuration conf, String queueId) {
ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queueId);
boolean isQueueRecovered = replicationQueueInfo.isQueueRecovered();

View File

@ -23,7 +23,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -43,7 +42,6 @@ import org.apache.yetus.audience.InterfaceAudience;
*/
@InterfaceAudience.Private
public interface ReplicationSourceInterface {
/**
* Initializer for the source
* @param conf the configuration to use
@ -76,7 +74,7 @@ public interface ReplicationSourceInterface {
/**
* Start the replication
*/
void startup();
ReplicationSourceInterface startup();
/**
* End the replication
@ -159,7 +157,6 @@ public interface ReplicationSourceInterface {
/**
* Try to throttle when the peer config with a bandwidth
* @param batchSize entries size will be pushed
* @throws InterruptedException
*/
void tryThrottle(int batchSize) throws InterruptedException;
@ -191,4 +188,21 @@ public interface ReplicationSourceInterface {
default boolean isRecovered() {
return false;
}
/**
* @return The instance of queueStorage used by this ReplicationSource.
*/
ReplicationQueueStorage getReplicationQueueStorage();
/**
* Log the current position to storage. Also clean old logs from the replication queue.
* Use to bypass the default call to
* {@link ReplicationSourceManager#logPositionAndCleanOldLogs(ReplicationSourceInterface,
* WALEntryBatch)} whem implementation does not need to persist state to backing storage.
* @param entryBatch the wal entry batch we just shipped
* @return The instance of queueStorage used by this ReplicationSource.
*/
default void logPositionAndCleanOldLogs(WALEntryBatch entryBatch) {
getSourceManager().logPositionAndCleanOldLogs(this, entryBatch);
}
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -26,6 +26,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.OptionalLong;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
@ -39,6 +40,7 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@ -48,6 +50,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationListener;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
@ -58,12 +61,14 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
@ -120,7 +125,15 @@ public class ReplicationSourceManager implements ReplicationListener {
private final ConcurrentMap<String, ReplicationSourceInterface> sources;
// List of all the sources we got from died RSs
private final List<ReplicationSourceInterface> oldsources;
/**
* Storage for queues that need persistance; e.g. Replication state so can be recovered
* after a crash. queueStorage upkeep is spread about this class and passed
* to ReplicationSource instances for these to do updates themselves. Not all ReplicationSource
* instances keep state.
*/
private final ReplicationQueueStorage queueStorage;
private final ReplicationTracker replicationTracker;
private final ReplicationPeers replicationPeers;
// UUID for this cluster
@ -145,7 +158,7 @@ public class ReplicationSourceManager implements ReplicationListener {
private final Path logDir;
// Path to the wal archive
private final Path oldLogDir;
private final WALFileLengthProvider walFileLengthProvider;
private final WALFactory walFactory;
// The number of ms that we wait before moving znodes, HBASE-3596
private final long sleepBeforeFailover;
// Homemade executer service for replication
@ -159,22 +172,30 @@ public class ReplicationSourceManager implements ReplicationListener {
private final long totalBufferLimit;
private final MetricsReplicationGlobalSourceSource globalMetrics;
/**
* A special ReplicationSource for hbase:meta Region Read Replicas.
* Usually this reference remains empty. If an hbase:meta Region is opened on this server, we
* will create an instance of a hbase:meta CatalogReplicationSource and it will live the life of
* the Server thereafter; i.e. we will not shut it down even if the hbase:meta moves away from
* this server (in case it later gets moved back). We synchronize on this instance testing for
* presence and if absent, while creating so only created and started once.
*/
@VisibleForTesting
AtomicReference<ReplicationSourceInterface> catalogReplicationSource = new AtomicReference<>();
/**
* Creates a replication manager and sets the watch on all the other registered region servers
* @param queueStorage the interface for manipulating replication queues
* @param replicationPeers
* @param replicationTracker
* @param conf the configuration to use
* @param server the server for this region server
* @param fs the file system to use
* @param logDir the directory that contains all wal directories of live RSs
* @param oldLogDir the directory where old logs are archived
* @param clusterId
*/
public ReplicationSourceManager(ReplicationQueueStorage queueStorage,
ReplicationPeers replicationPeers, ReplicationTracker replicationTracker, Configuration conf,
Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID clusterId,
WALFileLengthProvider walFileLengthProvider,
WALFactory walFactory,
MetricsReplicationGlobalSourceSource globalMetrics) throws IOException {
// CopyOnWriteArrayList is thread-safe.
// Generally, reading is more than modifying.
@ -193,7 +214,7 @@ public class ReplicationSourceManager implements ReplicationListener {
this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 30000); // 30
// seconds
this.clusterId = clusterId;
this.walFileLengthProvider = walFileLengthProvider;
this.walFactory = walFactory;
this.replicationTracker.registerListener(this);
// It's preferable to failover 1 RS at a time, but with good zk servers
// more could be processed at the same time.
@ -320,18 +341,21 @@ public class ReplicationSourceManager implements ReplicationListener {
}
/**
* Factory method to create a replication source
* @param queueId the id of the replication queue
* @return the created source
* @return a new 'classic' user-space replication source.
* @param queueId the id of the replication queue to associate the ReplicationSource with.
* @see #createCatalogReplicationSource() for creating a ReplicationSource for hbase:meta.
*/
private ReplicationSourceInterface createSource(String queueId, ReplicationPeer replicationPeer)
throws IOException {
ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, queueId);
MetricsSource metrics = new MetricsSource(queueId);
// init replication source
// Init the just created replication source. Pass the default walProvider's wal file length
// provider. Presumption is we replicate user-space Tables only. For hbase:meta region replica
// replication, see #createCatalogReplicationSource().
WALFileLengthProvider walFileLengthProvider =
this.walFactory.getWALProvider() != null?
this.walFactory.getWALProvider().getWALFileLengthProvider() : p -> OptionalLong.empty();
src.init(conf, fs, this, queueStorage, replicationPeer, server, queueId, clusterId,
walFileLengthProvider, metrics);
walFileLengthProvider, new MetricsSource(queueId));
return src;
}
@ -518,17 +542,16 @@ public class ReplicationSourceManager implements ReplicationListener {
/**
* This method will log the current position to storage. And also clean old logs from the
* replication queue.
* @param queueId id of the replication queue
* @param queueRecovered indicates if this queue comes from another region server
* @param entryBatch the wal entry batch we just shipped
*/
public void logPositionAndCleanOldLogs(String queueId, boolean queueRecovered,
public void logPositionAndCleanOldLogs(ReplicationSourceInterface source,
WALEntryBatch entryBatch) {
String fileName = entryBatch.getLastWalPath().getName();
String queueId = source.getQueueId();
interruptOrAbortWhenFail(() -> this.queueStorage
.setWALPosition(server.getServerName(), queueId, fileName, entryBatch.getLastWalPosition(),
entryBatch.getLastSeqIds()));
cleanOldLogs(fileName, entryBatch.isEndOfFile(), queueId, queueRecovered);
cleanOldLogs(fileName, entryBatch.isEndOfFile(), queueId, source.isRecovered());
}
/**
@ -959,4 +982,60 @@ public class ReplicationSourceManager implements ReplicationListener {
MetricsReplicationGlobalSourceSource getGlobalMetrics() {
return this.globalMetrics;
}
/**
* Add an hbase:meta Catalog replication source. Called on open of an hbase:meta Region.
* @see #removeCatalogReplicationSource()
*/
public ReplicationSourceInterface addCatalogReplicationSource() throws IOException {
// Open/Create the hbase:meta ReplicationSource once only.
synchronized (this.catalogReplicationSource) {
ReplicationSourceInterface rs = this.catalogReplicationSource.get();
return rs != null ? rs :
this.catalogReplicationSource.getAndSet(createCatalogReplicationSource());
}
}
/**
* Remove the hbase:meta Catalog replication source.
* Called when we close hbase:meta.
* @see #addCatalogReplicationSource()
*/
public void removeCatalogReplicationSource() {
// Nothing to do. Leave any CatalogReplicationSource in place in case an hbase:meta Region
// comes back to this server.
}
/**
* Create, initialize, and start the Catalog ReplicationSource.
*/
private ReplicationSourceInterface createCatalogReplicationSource() throws IOException {
// Has the hbase:meta WALProvider been instantiated?
WALProvider walProvider = this.walFactory.getMetaWALProvider();
boolean addListener = false;
if (walProvider == null) {
// The meta walProvider has not been instantiated. Create it.
walProvider = this.walFactory.getMetaProvider();
addListener = true;
}
CatalogReplicationSourcePeer peer = new CatalogReplicationSourcePeer(this.conf,
this.clusterId.toString(), "meta_" + ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER);
final ReplicationSourceInterface crs = new CatalogReplicationSource();
crs.init(conf, fs, this, new NoopReplicationQueueStorage(), peer, server, peer.getId(),
clusterId, walProvider.getWALFileLengthProvider(), new MetricsSource(peer.getId()));
if (addListener) {
walProvider.addWALActionsListener(new WALActionsListener() {
@Override
public void postLogRoll(Path oldPath, Path newPath) throws IOException {
crs.enqueueLog(newPath);
}
});
} else {
// This is a problem. We'll have a ReplicationSource but no listener on hbase:meta WALs
// so nothing will be replicated.
LOG.error("Did not install WALActionsListener creating CatalogReplicationSource!");
}
// Start this ReplicationSource.
return crs.startup();
}
}

View File

@ -33,7 +33,6 @@ import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
@ -260,8 +259,7 @@ public class ReplicationSourceShipper extends Thread {
// position and the file will be removed soon in cleanOldLogs.
if (batch.isEndOfFile() || !batch.getLastWalPath().equals(currentPath) ||
batch.getLastWalPosition() != currentPosition) {
source.getSourceManager().logPositionAndCleanOldLogs(source.getQueueId(),
source.isRecovered(), batch);
source.logPositionAndCleanOldLogs(batch);
updated = true;
}
// if end of file is true, then we can just skip to the next file in queue.

View File

@ -37,7 +37,6 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
@ -140,7 +139,7 @@ class SerialReplicationChecker {
public SerialReplicationChecker(Configuration conf, ReplicationSource source) {
this.peerId = source.getPeerId();
this.storage = source.getQueueStorage();
this.storage = source.getReplicationQueueStorage();
this.conn = source.getServer().getConnection();
this.waitTimeMs =
conf.getLong(REPLICATION_SERIALLY_WAITING_KEY, REPLICATION_SERIALLY_WAITING_DEFAULT);

View File

@ -25,6 +25,9 @@ import org.apache.yetus.audience.InterfaceAudience;
/**
* Used by replication to prevent replicating unacked log entries. See
* https://issues.apache.org/jira/browse/HBASE-14004 for more details.
* WALFileLengthProvider exists because we do not want to reference WALFactory and WALProvider
* directly in the replication code so in the future it will be easier to decouple them.
* Each walProvider will have its own implementation.
*/
@InterfaceAudience.Private
@FunctionalInterface

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -22,6 +22,8 @@ import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
@ -57,7 +59,15 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil {
public static final String REGION_REPLICA_REPLICATION_CONF_KEY
= "hbase.region.replica.replication.enabled";
private static final boolean DEFAULT_REGION_REPLICA_REPLICATION = false;
private static final String REGION_REPLICA_REPLICATION_PEER = "region_replica_replication";
public static final String REGION_REPLICA_REPLICATION_PEER = "region_replica_replication";
/**
* Same as for {@link #REGION_REPLICA_REPLICATION_CONF_KEY} but for catalog replication.
*/
public static final String REGION_REPLICA_REPLICATION_CATALOG_CONF_KEY
= "hbase.region.replica.replication.catalog.enabled";
private static final boolean DEFAULT_REGION_REPLICA_REPLICATION_CATALOG = false;
/**
* Enables or disables refreshing store files of secondary region replicas when the memory is
@ -116,7 +126,6 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil {
* files of the primary region, so an HFileLink is used to construct the StoreFileInfo. This
* way ensures that the secondary will be able to continue reading the store files even if
* they are moved to archive after compaction
* @throws IOException
*/
public static StoreFileInfo getStoreFileInfo(Configuration conf, FileSystem fs,
RegionInfo regionInfo, RegionInfo regionInfoForFs, String familyName, Path path)
@ -153,8 +162,7 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil {
}
/**
* Create replication peer for replicating to region replicas if needed.
* <p/>
* Create replication peer for replicating user-space Region Read Replicas.
* This methods should only be called at master side.
*/
public static void setupRegionReplicaReplication(MasterServices services)
@ -174,16 +182,42 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil {
services.addReplicationPeer(REGION_REPLICA_REPLICATION_PEER, peerConfig, true);
}
public static boolean isRegionReplicaReplicationEnabled(Configuration conf) {
return conf.getBoolean(REGION_REPLICA_REPLICATION_CONF_KEY,
DEFAULT_REGION_REPLICA_REPLICATION);
/**
* @return True if Region Read Replica is enabled for <code>tn</code> (whether hbase:meta or
* user-space tables).
*/
public static boolean isRegionReplicaReplicationEnabled(Configuration conf, TableName tn) {
return isMetaRegionReplicaReplicationEnabled(conf, tn) ||
isRegionReplicaReplicationEnabled(conf);
}
/**
* @return True if Region Read Replica is enabled for user-space tables.
*/
private static boolean isRegionReplicaReplicationEnabled(Configuration conf) {
return conf.getBoolean(REGION_REPLICA_REPLICATION_CONF_KEY, DEFAULT_REGION_REPLICA_REPLICATION);
}
/**
* @return True if hbase:meta Region Read Replica is enabled.
*/
public static boolean isMetaRegionReplicaReplicationEnabled(Configuration conf, TableName tn) {
return TableName.isMetaTableName(tn) &&
conf.getBoolean(REGION_REPLICA_REPLICATION_CATALOG_CONF_KEY,
DEFAULT_REGION_REPLICA_REPLICATION_CATALOG);
}
/**
* @return True if wait for primary to flush is enabled for user-space tables.
*/
public static boolean isRegionReplicaWaitForPrimaryFlushEnabled(Configuration conf) {
return conf.getBoolean(REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY,
DEFAULT_REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH);
}
/**
* @return True if we are to refresh user-space hfiles in Region Read Replicas.
*/
public static boolean isRegionReplicaStoreFileRefreshEnabled(Configuration conf) {
return conf.getBoolean(REGION_REPLICA_STORE_FILE_REFRESH,
DEFAULT_REGION_REPLICA_STORE_FILE_REFRESH);
@ -194,11 +228,4 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil {
DEFAULT_REGION_REPLICA_STORE_FILE_REFRESH_MEMSTORE_MULTIPLIER);
}
/**
* Return the peer id used for replicating to secondary region replicas
*/
public static String getReplicationPeerId() {
return REGION_REPLICA_REPLICATION_PEER;
}
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -67,7 +67,7 @@ public class WALFactory {
/**
* Maps between configuration names for providers and implementation classes.
*/
static enum Providers {
enum Providers {
defaultProvider(AsyncFSWALProvider.class),
filesystem(FSHLogProvider.class),
multiwal(RegionGroupingProvider.class),
@ -256,8 +256,12 @@ public class WALFactory {
return provider.getWALs();
}
@VisibleForTesting
WALProvider getMetaProvider() throws IOException {
/**
* Called when we lazily create a hbase:meta WAL OR from ReplicationSourceManager ahead of
* creating the first hbase:meta WAL so we can register a listener.
* @see #getMetaWALProvider()
*/
public WALProvider getMetaProvider() throws IOException {
for (;;) {
WALProvider provider = this.metaProvider.get();
if (provider != null) {
@ -306,7 +310,6 @@ public class WALFactory {
* to reopen it multiple times, use {@link WAL.Reader#reset()} instead of this method
* then just seek back to the last known good position.
* @return A WAL reader. Close when done with it.
* @throws IOException
*/
public Reader createReader(final FileSystem fs, final Path path,
CancelableProgressable reporter) throws IOException {
@ -490,6 +493,10 @@ public class WALFactory {
return this.provider;
}
/**
* @return Current metaProvider... may be null if not yet initialized.
* @see #getMetaProvider()
*/
public final WALProvider getMetaWALProvider() {
return this.metaProvider.get();
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -70,8 +70,9 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
}
@Override
public void startup() {
public ReplicationSourceInterface startup() {
startup.set(true);
return this;
}
public boolean isStartup() {
@ -160,4 +161,9 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
public ServerName getServerWALsBelongTo() {
return null;
}
@Override
public ReplicationQueueStorage getReplicationQueueStorage() {
return null;
}
}

View File

@ -0,0 +1,319 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Tests RegionReplicaReplicationEndpoint class for hbase:meta by setting up region replicas and
* verifying async wal replication replays the edits to the secondary region in various scenarios.
* @see TestRegionReplicaReplicationEndpoint
*/
@Category({LargeTests.class})
public class TestMetaRegionReplicaReplicationEndpoint {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestMetaRegionReplicaReplicationEndpoint.class);
private static final Logger LOG =
LoggerFactory.getLogger(TestMetaRegionReplicaReplicationEndpoint.class);
private static final int NB_SERVERS = 3;
private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
@Rule
public TestName name = new TestName();
@Before
public void before() throws Exception {
Configuration conf = HTU.getConfiguration();
conf.setFloat("hbase.regionserver.logroll.multiplier", 0.0003f);
conf.setInt("replication.source.size.capacity", 10240);
conf.setLong("replication.source.sleepforretries", 100);
conf.setInt("hbase.regionserver.maxlogs", 10);
conf.setLong("hbase.master.logcleaner.ttl", 10);
conf.setInt("zookeeper.recovery.retry", 1);
conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
conf.setInt("replication.stats.thread.period.seconds", 5);
conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
// Enable hbase:meta replication.
conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CATALOG_CONF_KEY, true);
// Set hbase:meta replicas to be 3.
conf.setInt(HConstants.META_REPLICAS_NUM, NB_SERVERS);
HTU.startMiniCluster(NB_SERVERS);
HTU.waitFor(30000,
() -> HTU.getMiniHBaseCluster().getRegions(TableName.META_TABLE_NAME).size() >= NB_SERVERS);
}
@After
public void after() throws Exception {
HTU.shutdownMiniCluster();
}
/**
* Assert that the ReplicationSource for hbase:meta gets created when hbase:meta is opened.
*/
@Test
public void testHBaseMetaReplicationSourceCreatedOnOpen()
throws IOException, InterruptedException {
MiniHBaseCluster cluster = HTU.getMiniHBaseCluster();
HRegionServer hrs = cluster.getRegionServer(cluster.getServerHoldingMeta());
assertTrue(isMetaRegionReplicaReplicationSource(hrs));
// Now move the hbase:meta and make sure the ReplicationSoruce is in both places.
HRegionServer hrsOther = null;
for (int i = 0; i < cluster.getNumLiveRegionServers(); i++) {
hrsOther = cluster.getRegionServer(i);
if (hrsOther.getServerName().equals(hrs.getServerName())) {
hrsOther = null;
continue;
}
break;
}
assertNotNull(hrsOther);
assertFalse(isMetaRegionReplicaReplicationSource(hrsOther));
Region meta = null;
for (Region region: hrs.getOnlineRegionsLocalContext()) {
if (region.getRegionInfo().isMetaRegion()) {
meta = region;
break;
}
}
assertNotNull(meta);
HTU.moveRegionAndWait(meta.getRegionInfo(), hrsOther.getServerName());
// Assert that there is a ReplicationSource in both places now.
assertTrue(isMetaRegionReplicaReplicationSource(hrs));
assertTrue(isMetaRegionReplicaReplicationSource(hrsOther));
}
/**
* @return Whether the special meta region replica peer is enabled on <code>hrs</code>
*/
private boolean isMetaRegionReplicaReplicationSource(HRegionServer hrs) {
return hrs.getReplicationSourceService().getReplicationManager().
catalogReplicationSource.get() != null;
}
/**
* Test meta region replica replication. Create some tables and see if replicas pick up the
* additions.
*/
@Test
public void testHBaseMetaReplicates() throws Exception {
try (Table table = HTU.createTable(TableName.valueOf(this.name.getMethodName() + "_0"),
HConstants.CATALOG_FAMILY,
Arrays.copyOfRange(HBaseTestingUtility.KEYS, 1, HBaseTestingUtility.KEYS.length))) {
verifyReplication(TableName.META_TABLE_NAME, NB_SERVERS, getMetaCells(table.getName()));
}
try (Table table = HTU.createTable(TableName.valueOf(this.name.getMethodName() + "_1"),
HConstants.CATALOG_FAMILY,
Arrays.copyOfRange(HBaseTestingUtility.KEYS, 1, HBaseTestingUtility.KEYS.length))) {
verifyReplication(TableName.META_TABLE_NAME, NB_SERVERS, getMetaCells(table.getName()));
// Try delete.
HTU.deleteTableIfAny(table.getName());
verifyDeletedReplication(TableName.META_TABLE_NAME, NB_SERVERS, table.getName());
}
}
/**
* Replicas come online after primary.
*/
private void waitForMetaReplicasToOnline() throws IOException {
final RegionLocator regionLocator =
HTU.getConnection().getRegionLocator(TableName.META_TABLE_NAME);
HTU.waitFor(10000,
// getRegionLocations returns an entry for each replica but if unassigned, entry is null.
// Pass reload to force us to skip cache else it just keeps returning default.
() -> regionLocator.getRegionLocations(HConstants.EMPTY_START_ROW, true).stream().
filter(Objects::nonNull).count() >= NB_SERVERS);
List<HRegionLocation> locations = regionLocator.getRegionLocations(HConstants.EMPTY_START_ROW);
LOG.info("Found locations {}", locations);
assertEquals(NB_SERVERS, locations.size());
}
/**
* Scan hbase:meta for <code>tableName</code> content.
*/
private List<Result> getMetaCells(TableName tableName) throws IOException {
final List<Result> results = new ArrayList<>();
MetaTableAccessor.Visitor visitor = new MetaTableAccessor.Visitor() {
@Override public boolean visit(Result r) throws IOException {
results.add(r);
return true;
}
};
MetaTableAccessor.scanMetaForTableRegions(HTU.getConnection(), visitor, tableName);
return results;
}
/**
* @return All Regions for tableName including Replicas.
*/
private Region [] getAllRegions(TableName tableName, int replication) {
final Region[] regions = new Region[replication];
for (int i = 0; i < NB_SERVERS; i++) {
HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
List<HRegion> onlineRegions = rs.getRegions(tableName);
for (HRegion region : onlineRegions) {
regions[region.getRegionInfo().getReplicaId()] = region;
}
}
for (Region region : regions) {
assertNotNull(region);
}
return regions;
}
/**
* Verify when a Table is deleted from primary, then there are no references in replicas
* (because they get the delete of the table rows too).
*/
private void verifyDeletedReplication(TableName tableName, int regionReplication,
final TableName deletedTableName) {
final Region[] regions = getAllRegions(tableName, regionReplication);
// Start count at '1' so we skip default, primary replica and only look at secondaries.
for (int i = 1; i < regionReplication; i++) {
final Region region = regions[i];
// wait until all the data is replicated to all secondary regions
Waiter.waitFor(HTU.getConfiguration(), 30000, 1000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
LOG.info("Verifying replication for region replica {}", region.getRegionInfo());
try (RegionScanner rs = region.getScanner(new Scan())) {
List<Cell> cells = new ArrayList<>();
while (rs.next(cells)) {
continue;
}
return doesNotContain(cells, deletedTableName);
} catch(Throwable ex) {
LOG.warn("Verification from secondary region is not complete yet", ex);
// still wait
return false;
}
}
});
}
}
/**
* Cells are from hbase:meta replica so will start w/ 'tableName,'; i.e. the tablename followed
* by HConstants.DELIMITER. Make sure the deleted table is no longer present in passed
* <code>cells</code>.
*/
private boolean doesNotContain(List<Cell> cells, TableName tableName) {
for (Cell cell: cells) {
String row = Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
if (row.startsWith(tableName.toString() + HConstants.DELIMITER)) {
return false;
}
}
return true;
}
/**
* Verify Replicas have results (exactly).
*/
private void verifyReplication(TableName tableName, int regionReplication,
List<Result> contains) {
final Region[] regions = getAllRegions(tableName, regionReplication);
// Start count at '1' so we skip default, primary replica and only look at secondaries.
for (int i = 1; i < regionReplication; i++) {
final Region region = regions[i];
// wait until all the data is replicated to all secondary regions
Waiter.waitFor(HTU.getConfiguration(), 30000, 1000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
LOG.info("Verifying replication for region replica {}", region.getRegionInfo());
try (RegionScanner rs = region.getScanner(new Scan())) {
List<Cell> cells = new ArrayList<>();
while (rs.next(cells)) {
continue;
}
return contains(contains, cells);
} catch(Throwable ex) {
LOG.warn("Verification from secondary region is not complete yet", ex);
// still wait
return false;
}
}
});
}
}
/**
* Presumes sorted Cells. Verify that <code>cells</code> has <code>contains</code> at least.
*/
static boolean contains(List<Result> contains, List<Cell> cells) throws IOException {
CellScanner containsScanner = CellUtil.createCellScanner(contains);
CellScanner cellsScanner = CellUtil.createCellScanner(cells);
int matches = 0;
int count = 0;
while (containsScanner.advance()) {
while (cellsScanner.advance()) {
count++;
LOG.info("{} {}", containsScanner.current(), cellsScanner.current());
if (containsScanner.current().equals(cellsScanner.current())) {
matches++;
break;
}
}
}
return !containsScanner.advance() && matches >= 1 && count >= matches && count == cells.size();
}
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -20,12 +20,14 @@ package org.apache.hadoop.hbase.replication.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
@ -42,6 +44,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
@ -51,6 +54,7 @@ import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.testclassification.FlakeyTests;
@ -69,6 +73,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -124,91 +129,97 @@ public class TestRegionReplicaReplicationEndpoint {
public void testRegionReplicaReplicationPeerIsCreated() throws IOException, ReplicationException {
// create a table with region replicas. Check whether the replication peer is created
// and replication started.
ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration());
String peerId = "region_replica_replication";
try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
Admin admin = connection.getAdmin()) {
String peerId = ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER;
ReplicationPeerConfig peerConfig = null;
try {
peerConfig = admin.getPeerConfig(peerId);
} catch (ReplicationPeerNotFoundException e) {
LOG.warn("Region replica replication peer id=" + peerId + " not exist", e);
}
ReplicationPeerConfig peerConfig = null;
try {
peerConfig = admin.getReplicationPeerConfig(peerId);
} catch (ReplicationPeerNotFoundException e) {
LOG.warn("Region replica replication peer id=" + peerId + " not exist", e);
}
if (peerConfig != null) {
admin.removePeer(peerId);
peerConfig = null;
}
try {
peerConfig = admin.getReplicationPeerConfig(peerId);
} catch (ReplicationPeerNotFoundException e) {
LOG.warn("Region replica replication peer id=" + peerId + " not exist", e);
}
HTableDescriptor htd = HTU.createTableDescriptor(
"testReplicationPeerIsCreated_no_region_replicas");
HTU.getAdmin().createTable(htd);
try {
peerConfig = admin.getPeerConfig(peerId);
fail("Should throw ReplicationException, because replication peer id=" + peerId
+ " not exist");
} catch (ReplicationPeerNotFoundException e) {
}
assertNull(peerConfig);
if (peerConfig != null) {
admin.removeReplicationPeer(peerId);
peerConfig = null;
}
htd = HTU.createTableDescriptor("testReplicationPeerIsCreated");
htd.setRegionReplication(2);
HTU.getAdmin().createTable(htd);
HTableDescriptor htd = HTU.createTableDescriptor(
"testReplicationPeerIsCreated_no_region_replicas");
HTU.getAdmin().createTable(htd);
try {
peerConfig = admin.getReplicationPeerConfig(peerId);
fail("Should throw ReplicationException, because replication peer id=" + peerId
+ " not exist");
} catch (ReplicationPeerNotFoundException e) {
}
assertNull(peerConfig);
// assert peer configuration is correct
peerConfig = admin.getPeerConfig(peerId);
assertNotNull(peerConfig);
assertEquals(peerConfig.getClusterKey(), ZKConfig.getZooKeeperClusterKey(
HTU.getConfiguration()));
assertEquals(RegionReplicaReplicationEndpoint.class.getName(),
peerConfig.getReplicationEndpointImpl());
admin.close();
htd = HTU.createTableDescriptor("testReplicationPeerIsCreated");
htd.setRegionReplication(2);
HTU.getAdmin().createTable(htd);
// assert peer configuration is correct
peerConfig = admin.getReplicationPeerConfig(peerId);
assertNotNull(peerConfig);
assertEquals(peerConfig.getClusterKey(), ZKConfig.getZooKeeperClusterKey(
HTU.getConfiguration()));
assertEquals(RegionReplicaReplicationEndpoint.class.getName(),
peerConfig.getReplicationEndpointImpl());
}
}
@Test
public void testRegionReplicaReplicationPeerIsCreatedForModifyTable() throws Exception {
// modify a table by adding region replicas. Check whether the replication peer is created
// and replication started.
ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration());
String peerId = "region_replica_replication";
try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
Admin admin = connection.getAdmin()) {
String peerId = ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER;
ReplicationPeerConfig peerConfig = null;
try {
peerConfig = admin.getReplicationPeerConfig(peerId);
} catch (ReplicationPeerNotFoundException e) {
LOG.warn("Region replica replication peer id=" + peerId + " not exist", e);
}
ReplicationPeerConfig peerConfig = null;
try {
peerConfig = admin.getPeerConfig(peerId);
} catch (ReplicationPeerNotFoundException e) {
LOG.warn("Region replica replication peer id=" + peerId + " not exist", e);
}
if (peerConfig != null) {
admin.removeReplicationPeer(peerId);
peerConfig = null;
}
if (peerConfig != null) {
admin.removePeer(peerId);
peerConfig = null;
}
HTableDescriptor htd = HTU.createTableDescriptor("testRegionReplicaReplicationPeerIsCreatedForModifyTable");
HTU.getAdmin().createTable(htd);
HTableDescriptor htd
= HTU.createTableDescriptor("testRegionReplicaReplicationPeerIsCreatedForModifyTable");
HTU.getAdmin().createTable(htd);
// assert that replication peer is not created yet
try {
peerConfig = admin.getPeerConfig(peerId);
fail("Should throw ReplicationException, because replication peer id=" + peerId
// assert that replication peer is not created yet
try {
peerConfig = admin.getReplicationPeerConfig(peerId);
fail("Should throw ReplicationException, because replication peer id=" + peerId
+ " not exist");
} catch (ReplicationPeerNotFoundException e) {
}
assertNull(peerConfig);
} catch (ReplicationPeerNotFoundException e) {
}
assertNull(peerConfig);
HTU.getAdmin().disableTable(htd.getTableName());
htd.setRegionReplication(2);
HTU.getAdmin().modifyTable(htd.getTableName(), htd);
HTU.getAdmin().enableTable(htd.getTableName());
HTU.getAdmin().disableTable(htd.getTableName());
htd.setRegionReplication(2);
HTU.getAdmin().modifyTable(htd.getTableName(), htd);
HTU.getAdmin().enableTable(htd.getTableName());
// assert peer configuration is correct
peerConfig = admin.getPeerConfig(peerId);
assertNotNull(peerConfig);
assertEquals(peerConfig.getClusterKey(), ZKConfig.getZooKeeperClusterKey(
HTU.getConfiguration()));
assertEquals(RegionReplicaReplicationEndpoint.class.getName(),
// assert peer configuration is correct
peerConfig = admin.getReplicationPeerConfig(peerId);
assertNotNull(peerConfig);
assertEquals(peerConfig.getClusterKey(), ZKConfig.getZooKeeperClusterKey(HTU.getConfiguration()));
assertEquals(RegionReplicaReplicationEndpoint.class.getName(),
peerConfig.getReplicationEndpointImpl());
admin.close();
admin.close();
}
}
public void testRegionReplicaReplication(int regionReplication) throws Exception {
@ -405,8 +416,7 @@ public class TestRegionReplicaReplicationEndpoint {
HTU.getAdmin().createTable(htd);
// both tables are created, now pause replication
ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration());
admin.disablePeer(ServerRegionReplicaUtil.getReplicationPeerId());
HTU.getAdmin().disableReplicationPeer(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER);
// now that the replication is disabled, write to the table to be dropped, then drop the table.
@ -450,6 +460,27 @@ public class TestRegionReplicaReplicationEndpoint {
assertEquals(2, skippedEdits.get());
HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(0);
MetricsSource metrics = mock(MetricsSource.class);
ReplicationEndpoint.Context ctx =
new ReplicationEndpoint.Context(HTU.getConfiguration(), HTU.getConfiguration(),
HTU.getTestFileSystem(), ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER,
UUID.fromString(rs.getClusterId()), rs.getReplicationSourceService().
getReplicationManager().getReplicationPeers()
.getPeer(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER),
metrics, rs.getTableDescriptors(), rs);
RegionReplicaReplicationEndpoint rrpe = new RegionReplicaReplicationEndpoint();
rrpe.init(ctx);
rrpe.start();
ReplicationEndpoint.ReplicateContext repCtx = new ReplicationEndpoint.ReplicateContext();
repCtx.setEntries(Lists.newArrayList(entry, entry));
assertTrue(rrpe.replicate(repCtx));
/* Come back here. There is a difference on how counting is done here and in master branch.
St.Ack
Mockito.verify(metrics, Mockito.times(1)).
incrLogEditsFiltered(Mockito.eq(2L));
*/
rrpe.stop();
if (disableReplication) {
// enable replication again so that we can verify replication
HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table
@ -465,12 +496,11 @@ public class TestRegionReplicaReplicationEndpoint {
HTU.loadNumericRows(table, HBaseTestingUtility.fam1, 0, 1000);
// now enable the replication
admin.enablePeer(ServerRegionReplicaUtil.getReplicationPeerId());
HTU.getAdmin().enableReplicationPeer(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER);
verifyReplication(tableName, regionReplication, 0, 1000);
} finally {
admin.close();
table.close();
rl.close();
tableToBeDisabled.close();

View File

@ -320,7 +320,7 @@ public abstract class TestReplicationSourceManager {
wal.rollWriter();
manager.logPositionAndCleanOldLogs("1", false,
manager.logPositionAndCleanOldLogs(manager.getSources().get(0),
new WALEntryBatch(0, manager.getSources().get(0).getCurrentPath()));
wal.appendData(hri,

View File

@ -109,7 +109,7 @@ public class TestSerialReplicationChecker {
public void setUp() throws IOException {
ReplicationSource source = mock(ReplicationSource.class);
when(source.getPeerId()).thenReturn(PEER_ID);
when(source.getQueueStorage()).thenReturn(QUEUE_STORAGE);
when(source.getReplicationQueueStorage()).thenReturn(QUEUE_STORAGE);
conn = mock(Connection.class);
when(conn.isClosed()).thenReturn(false);
doAnswer(new Answer<Table>() {