HBASE-24817 Allow configuring WALEntry filters on ReplicationSource (#2198)

Allow specifying base WALEntry filter on construction of
ReplicationSource. Add means of being able to filter WALs by name.

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
 Add constructor that allows passing a predicate for filtering *in* WALs
 and a list of filters for filtering *out* WALEntries. The latter was
 hardcoded to filter out system-table WALEntries. The former did not
 exist but we'll need it if Replication takes in more than just the
 default Provider.

Signed-off-by Anoop Sam John <anoopsamjohn@apache.org>
Signed-off-by: Viraj Jasani <vjasani@apache.org>
This commit is contained in:
Michael Stack 2020-08-06 09:29:08 -07:00 committed by stack
parent 8003a15835
commit 0ae125ac6f
5 changed files with 248 additions and 117 deletions

View File

@ -1,4 +1,4 @@
/** /*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -17,20 +17,16 @@
*/ */
package org.apache.hadoop.hbase.replication; package org.apache.hadoop.hbase.replication;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.yetus.audience.InterfaceAudience;
/** /**
* Skips WAL edits for all System tables including META * Skips WAL edits for all System tables including hbase:meta.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class SystemTableWALEntryFilter implements WALEntryFilter { public class SystemTableWALEntryFilter implements WALEntryFilter {
@Override @Override
public Entry filter(Entry entry) { public Entry filter(Entry entry) {
if (entry.getKey().getTableName().isSystemTable()) { return entry.getKey().getTableName().isSystemTable()? null: entry;
return null;
}
return entry;
} }
} }

View File

@ -1,4 +1,4 @@
/** /*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -16,10 +16,9 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hbase.replication; package org.apache.hadoop.hbase.replication;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.yetus.audience.InterfaceAudience;
/** /**
* A Filter for WAL entries before being sent over to replication. Multiple * A Filter for WAL entries before being sent over to replication. Multiple
@ -34,7 +33,6 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
*/ */
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
public interface WALEntryFilter { public interface WALEntryFilter {
/** /**
* <p> * <p>
* Applies the filter, possibly returning a different Entry instance. If null is returned, the * Applies the filter, possibly returning a different Entry instance. If null is returned, the
@ -49,5 +47,5 @@ public interface WALEntryFilter {
* @return a (possibly modified) Entry to use. Returning null or an entry with no cells will cause * @return a (possibly modified) Entry to use. Returning null or an entry with no cells will cause
* the entry to be skipped for replication. * the entry to be skipped for replication.
*/ */
public Entry filter(Entry entry); Entry filter(Entry entry);
} }

View File

@ -1,4 +1,4 @@
/** /*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -18,12 +18,12 @@
package org.apache.hadoop.hbase.replication.regionserver; package org.apache.hadoop.hbase.replication.regionserver;
import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getArchivedLogPath; import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getArchivedLogPath;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.InvocationTargetException; import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -36,6 +36,7 @@ import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -63,16 +64,15 @@ import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists; import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
/** /**
* Class that handles the source of a replication stream. * Class that handles the source of a replication stream.
* Currently does not handle more than 1 slave * Currently does not handle more than 1 slave cluster.
* For each slave cluster it selects a random number of peers * For each slave cluster it selects a random number of peers
* using a replication ratio. For example, if replication ration = 0.1 * using a replication ratio. For example, if replication ration = 0.1
* and slave cluster has 100 region servers, 10 will be selected. * and slave cluster has 100 region servers, 10 will be selected.
@ -119,8 +119,12 @@ public class ReplicationSource implements ReplicationSourceInterface {
private int logQueueWarnThreshold; private int logQueueWarnThreshold;
// ReplicationEndpoint which will handle the actual replication // ReplicationEndpoint which will handle the actual replication
private volatile ReplicationEndpoint replicationEndpoint; private volatile ReplicationEndpoint replicationEndpoint;
// A filter (or a chain of filters) for the WAL entries.
/**
* A filter (or a chain of filters) for WAL entries; filters out edits.
*/
protected volatile WALEntryFilter walEntryFilter; protected volatile WALEntryFilter walEntryFilter;
// throttler // throttler
private ReplicationThrottler throttler; private ReplicationThrottler throttler;
private long defaultBandwidth; private long defaultBandwidth;
@ -139,6 +143,39 @@ public class ReplicationSource implements ReplicationSourceInterface {
private Thread initThread; private Thread initThread;
/**
* WALs to replicate.
* Predicate that returns 'true' for WALs to replicate and false for WALs to skip.
*/
private final Predicate<Path> filterInWALs;
/**
* Base WALEntry filters for this class. Unmodifiable. Set on construction.
* Filters *out* edits we do not want replicated, passed on to replication endpoints.
* This is the basic set. Down in #initializeWALEntryFilter this set is added to the end of
* the WALEntry filter chain. These are put after those that we pick up from the configured
* endpoints and other machinations to create the final {@link #walEntryFilter}.
* @see WALEntryFilter
*/
private final List<WALEntryFilter> baseFilterOutWALEntries;
ReplicationSource() {
// Default, filters *in* all WALs but meta WALs & filters *out* all WALEntries of System Tables.
this(p -> !AbstractFSWALProvider.isMetaFile(p),
Lists.newArrayList(new SystemTableWALEntryFilter()));
}
/**
* @param replicateWAL Pass a filter to run against WAL Path; filter *in* WALs to Replicate;
* i.e. return 'true' if you want to replicate the content of the WAL.
* @param baseFilterOutWALEntries Base set of filters you want applied always; filters *out*
* WALEntries so they never make it out of this ReplicationSource.
*/
ReplicationSource(Predicate<Path> replicateWAL, List<WALEntryFilter> baseFilterOutWALEntries) {
this.filterInWALs = replicateWAL;
this.baseFilterOutWALEntries = Collections.unmodifiableList(baseFilterOutWALEntries);
}
/** /**
* Instantiation method used by region servers * Instantiation method used by region servers
* @param conf configuration to use * @param conf configuration to use
@ -192,30 +229,34 @@ public class ReplicationSource implements ReplicationSourceInterface {
} }
@Override @Override
public void enqueueLog(Path log) { public void enqueueLog(Path wal) {
String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log.getName()); if (!this.filterInWALs.test(wal)) {
LOG.trace("NOT replicating {}", wal);
return;
}
String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal.getName());
PriorityBlockingQueue<Path> queue = queues.get(logPrefix); PriorityBlockingQueue<Path> queue = queues.get(logPrefix);
if (queue == null) { if (queue == null) {
queue = new PriorityBlockingQueue<>(queueSizePerGroup, new LogsComparator()); queue = new PriorityBlockingQueue<>(queueSizePerGroup, new LogsComparator());
// make sure that we do not use an empty queue when setting up a ReplicationSource, otherwise // make sure that we do not use an empty queue when setting up a ReplicationSource, otherwise
// the shipper may quit immediately // the shipper may quit immediately
queue.put(log); queue.put(wal);
queues.put(logPrefix, queue); queues.put(logPrefix, queue);
if (this.isSourceActive() && this.walEntryFilter != null) { if (this.isSourceActive() && this.walEntryFilter != null) {
// new wal group observed after source startup, start a new worker thread to track it // new wal group observed after source startup, start a new worker thread to track it
// notice: it's possible that log enqueued when this.running is set but worker thread // notice: it's possible that wal enqueued when this.running is set but worker thread
// still not launched, so it's necessary to check workerThreads before start the worker // still not launched, so it's necessary to check workerThreads before start the worker
tryStartNewShipper(logPrefix, queue); tryStartNewShipper(logPrefix, queue);
} }
} else { } else {
queue.put(log); queue.put(wal);
} }
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("{} Added log file {} to queue of source {}.", logPeerId(), logPrefix, LOG.trace("{} Added wal {} to queue of source {}.", logPeerId(), logPrefix,
this.replicationQueueInfo.getQueueId()); this.replicationQueueInfo.getQueueId());
} }
this.metrics.incrSizeOfLogQueue(); this.metrics.incrSizeOfLogQueue();
// This will log a warning for each new log that gets created above the warn threshold // This will wal a warning for each new wal that gets created above the warn threshold
int queueSize = queue.size(); int queueSize = queue.size();
if (queueSize > this.logQueueWarnThreshold) { if (queueSize > this.logQueueWarnThreshold) {
LOG.warn("{} WAL group {} queue size: {} exceeds value of " LOG.warn("{} WAL group {} queue size: {} exceeds value of "
@ -304,8 +345,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
private void initializeWALEntryFilter(UUID peerClusterId) { private void initializeWALEntryFilter(UUID peerClusterId) {
// get the WALEntryFilter from ReplicationEndpoint and add it to default filters // get the WALEntryFilter from ReplicationEndpoint and add it to default filters
ArrayList<WALEntryFilter> filters = List<WALEntryFilter> filters = new ArrayList<>(this.baseFilterOutWALEntries);
Lists.<WALEntryFilter> newArrayList(new SystemTableWALEntryFilter());
WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter(); WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter();
if (filterFromEndpoint != null) { if (filterFromEndpoint != null) {
filters.add(filterFromEndpoint); filters.add(filterFromEndpoint);
@ -400,6 +440,16 @@ public class ReplicationSource implements ReplicationSourceInterface {
: new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this); : new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this);
} }
/**
* Call after {@link #initializeWALEntryFilter(UUID)} else it will be null.
* @return The WAL Entry Filter Chain this ReplicationSource will use on WAL files filtering
* out WALEntry edits.
*/
@VisibleForTesting
WALEntryFilter getWalEntryFilter() {
return walEntryFilter;
}
protected final void uncaughtException(Thread t, Throwable e) { protected final void uncaughtException(Thread t, Throwable e) {
RSRpcServices.exitIfOOME(e); RSRpcServices.exitIfOOME(e);
LOG.error("Unexpected exception in {} currentPath={}", LOG.error("Unexpected exception in {} currentPath={}",
@ -622,7 +672,10 @@ public class ReplicationSource implements ReplicationSourceInterface {
} }
} }
if (clearMetrics) { if (clearMetrics) {
this.metrics.clear(); // Can be null in test context.
if (this.metrics != null) {
this.metrics.clear();
}
} }
} }
@ -647,10 +700,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
return !this.server.isStopped() && this.sourceRunning; return !this.server.isStopped() && this.sourceRunning;
} }
public UUID getPeerClusterUUID(){
return this.clusterId;
}
/** /**
* Comparator used to compare logs together based on their start time * Comparator used to compare logs together based on their start time
*/ */

View File

@ -1,4 +1,4 @@
/** /*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -45,7 +45,6 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability; import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists; import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
@ -307,7 +306,6 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
* Construct the directory name for all old WALs on a given server. The default old WALs dir looks * Construct the directory name for all old WALs on a given server. The default old WALs dir looks
* like: <code>hbase/oldWALs</code>. If you config hbase.separate.oldlogdir.by.regionserver to * like: <code>hbase/oldWALs</code>. If you config hbase.separate.oldlogdir.by.regionserver to
* true, it looks like <code>hbase//oldWALs/kalashnikov.att.net,61634,1486865297088</code>. * true, it looks like <code>hbase//oldWALs/kalashnikov.att.net,61634,1486865297088</code>.
* @param conf
* @param serverName Server name formatted as described in {@link ServerName} * @param serverName Server name formatted as described in {@link ServerName}
* @return the relative WAL directory name * @return the relative WAL directory name
*/ */
@ -413,11 +411,11 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
return isMetaFile(p.getName()); return isMetaFile(p.getName());
} }
/**
* @return True if String ends in {@link #META_WAL_PROVIDER_ID}
*/
public static boolean isMetaFile(String p) { public static boolean isMetaFile(String p) {
if (p != null && p.endsWith(META_WAL_PROVIDER_ID)) { return p != null && p.endsWith(META_WAL_PROVIDER_ID);
return true;
}
return false;
} }
public static boolean isArchivedLogFile(Path p) { public static boolean isArchivedLogFile(Path p) {
@ -460,12 +458,9 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
* @param path path to WAL file * @param path path to WAL file
* @param conf configuration * @param conf configuration
* @return WAL Reader instance * @return WAL Reader instance
* @throws IOException
*/ */
public static org.apache.hadoop.hbase.wal.WAL.Reader openReader(Path path, Configuration conf) public static org.apache.hadoop.hbase.wal.WAL.Reader openReader(Path path, Configuration conf)
throws IOException throws IOException {
{
long retryInterval = 2000; // 2 sec long retryInterval = 2000; // 2 sec
int maxAttempts = 30; int maxAttempts = 30;
int attempt = 0; int attempt = 0;

View File

@ -1,4 +1,4 @@
/** /*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -16,13 +16,14 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hbase.replication.regionserver; package org.apache.hadoop.hbase.replication.regionserver;
import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.META_WAL_PROVIDER_ID;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
import java.util.OptionalLong; import java.util.OptionalLong;
import java.util.UUID;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
@ -41,19 +42,14 @@ import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.Waiter.Predicate;
import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource;
import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSourceShipper;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -63,7 +59,6 @@ import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.hadoop.hbase.wal.WALProvider;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Test; import org.junit.Test;
@ -96,9 +91,13 @@ public class TestReplicationSource {
FS = TEST_UTIL.getDFSCluster().getFileSystem(); FS = TEST_UTIL.getDFSCluster().getFileSystem();
Path rootDir = TEST_UTIL.createRootDir(); Path rootDir = TEST_UTIL.createRootDir();
oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
if (FS.exists(oldLogDir)) FS.delete(oldLogDir, true); if (FS.exists(oldLogDir)) {
FS.delete(oldLogDir, true);
}
logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME); logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME);
if (FS.exists(logDir)) FS.delete(logDir, true); if (FS.exists(logDir)) {
FS.delete(logDir, true);
}
} }
@AfterClass @AfterClass
@ -108,16 +107,100 @@ public class TestReplicationSource {
TEST_UTIL.shutdownMiniDFSCluster(); TEST_UTIL.shutdownMiniDFSCluster();
} }
/**
* Test the default ReplicationSource skips queuing hbase:meta WAL files.
*/
@Test
public void testDefaultSkipsMetaWAL() throws IOException {
ReplicationSource rs = new ReplicationSource();
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
conf.setInt("replication.source.maxretriesmultiplier", 1);
ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
Mockito.when(mockPeer.getConfiguration()).thenReturn(conf);
Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class);
Mockito.when(peerConfig.getReplicationEndpointImpl()).
thenReturn(DoNothingReplicationEndpoint.class.getName());
Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
String queueId = "qid";
RegionServerServices rss =
TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
rs.init(conf, null, manager, null, mockPeer, rss, queueId, null,
p -> OptionalLong.empty(), new MetricsSource(queueId));
try {
rs.startup();
assertTrue(rs.isSourceActive());
assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue());
rs.enqueueLog(new Path("a.1" + META_WAL_PROVIDER_ID));
assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue());
rs.enqueueLog(new Path("a.1"));
assertEquals(1, rs.getSourceMetrics().getSizeOfLogQueue());
} finally {
rs.terminate("Done");
rss.stop("Done");
}
}
/**
* Test that we filter out meta edits, etc.
*/
@Test
public void testWALEntryFilter() throws IOException {
// To get the fully constructed default WALEntryFilter, need to create a ReplicationSource
// instance and init it.
ReplicationSource rs = new ReplicationSource();
UUID uuid = UUID.randomUUID();
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
Mockito.when(mockPeer.getConfiguration()).thenReturn(conf);
Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class);
Mockito.when(peerConfig.getReplicationEndpointImpl()).
thenReturn(DoNothingReplicationEndpoint.class.getName());
Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
String queueId = "qid";
RegionServerServices rss =
TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
rs.init(conf, null, manager, null, mockPeer, rss, queueId,
uuid, p -> OptionalLong.empty(), new MetricsSource(queueId));
try {
rs.startup();
TEST_UTIL.waitFor(30000, () -> rs.getWalEntryFilter() != null);
WALEntryFilter wef = rs.getWalEntryFilter();
// Test non-system WAL edit.
WAL.Entry e = new WAL.Entry(new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY,
TableName.valueOf("test"), -1), new WALEdit());
assertTrue(wef.filter(e) == e);
// Test system WAL edit.
e = new WAL.Entry(
new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TableName.META_TABLE_NAME, -1),
new WALEdit());
assertNull(wef.filter(e));
} finally {
rs.terminate("Done");
rss.stop("Done");
}
}
/** /**
* Sanity check that we can move logs around while we are reading * Sanity check that we can move logs around while we are reading
* from them. Should this test fail, ReplicationSource would have a hard * from them. Should this test fail, ReplicationSource would have a hard
* time reading logs that are being archived. * time reading logs that are being archived.
*/ */
// This tests doesn't belong in here... it is not about ReplicationSource.
@Test @Test
public void testLogMoving() throws Exception{ public void testLogMoving() throws Exception{
Path logPath = new Path(logDir, "log"); Path logPath = new Path(logDir, "log");
if (!FS.exists(logDir)) FS.mkdirs(logDir); if (!FS.exists(logDir)) {
if (!FS.exists(oldLogDir)) FS.mkdirs(oldLogDir); FS.mkdirs(logDir);
}
if (!FS.exists(oldLogDir)) {
FS.mkdirs(oldLogDir);
}
WALProvider.Writer writer = WALFactory.createWALWriter(FS, logPath, WALProvider.Writer writer = WALFactory.createWALWriter(FS, logPath,
TEST_UTIL.getConfiguration()); TEST_UTIL.getConfiguration());
for(int i = 0; i < 3; i++) { for(int i = 0; i < 3; i++) {
@ -142,7 +225,7 @@ public class TestReplicationSource {
entry = reader.next(); entry = reader.next();
assertNotNull(entry); assertNotNull(entry);
entry = reader.next(); reader.next();
entry = reader.next(); entry = reader.next();
assertNull(entry); assertNull(entry);
@ -151,47 +234,31 @@ public class TestReplicationSource {
/** /**
* Tests that {@link ReplicationSource#terminate(String)} will timeout properly * Tests that {@link ReplicationSource#terminate(String)} will timeout properly
* Moved here from TestReplicationSource because doesn't need cluster.
*/ */
@Test @Test
public void testTerminateTimeout() throws Exception { public void testTerminateTimeout() throws Exception {
ReplicationSource source = new ReplicationSource(); ReplicationSource source = new ReplicationSource();
ReplicationEndpoint replicationEndpoint = new HBaseInterClusterReplicationEndpoint() { ReplicationEndpoint
@Override replicationEndpoint = new DoNothingReplicationEndpoint();
protected void doStart() { try {
notifyStarted(); replicationEndpoint.start();
} ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
@Override Configuration testConf = HBaseConfiguration.create();
protected void doStop() { testConf.setInt("replication.source.maxretriesmultiplier", 1);
// not calling notifyStopped() here causes the caller of stop() to get a Future that never ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
// completes Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
} source.init(testConf, null, manager, null, mockPeer, null, "testPeer",
}; null, p -> OptionalLong.empty(), null);
replicationEndpoint.start(); ExecutorService executor = Executors.newSingleThreadExecutor();
ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); Future<?> future = executor.submit(
Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); () -> source.terminate("testing source termination"));
Configuration testConf = HBaseConfiguration.create(); long sleepForRetries = testConf.getLong("replication.source.sleepforretries", 1000);
testConf.setInt("replication.source.maxretriesmultiplier", 1); Waiter.waitFor(testConf, sleepForRetries * 2, (Waiter.Predicate<Exception>) future::isDone);
ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); } finally {
Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); replicationEndpoint.stop();
source.init(testConf, null, manager, null, mockPeer, null, "testPeer", null, }
p -> OptionalLong.empty(), null);
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<?> future = executor.submit(new Runnable() {
@Override
public void run() {
source.terminate("testing source termination");
}
});
long sleepForRetries = testConf.getLong("replication.source.sleepforretries", 1000);
Waiter.waitFor(testConf, sleepForRetries * 2, new Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return future.isDone();
}
});
} }
/** /**
@ -211,10 +278,10 @@ public class TestReplicationSource {
HRegionServer serverA = cluster.getRegionServer(0); HRegionServer serverA = cluster.getRegionServer(0);
final ReplicationSourceManager managerA = final ReplicationSourceManager managerA =
((Replication) serverA.getReplicationSourceService()).getReplicationManager(); serverA.getReplicationSourceService().getReplicationManager();
HRegionServer serverB = cluster.getRegionServer(1); HRegionServer serverB = cluster.getRegionServer(1);
final ReplicationSourceManager managerB = final ReplicationSourceManager managerB =
((Replication) serverB.getReplicationSourceService()).getReplicationManager(); serverB.getReplicationSourceService().getReplicationManager();
final Admin admin = TEST_UTIL.getAdmin(); final Admin admin = TEST_UTIL.getAdmin();
final String peerId = "TestPeer"; final String peerId = "TestPeer";
@ -222,7 +289,7 @@ public class TestReplicationSource {
ReplicationPeerConfig.newBuilder().setClusterKey(TEST_UTIL_PEER.getClusterKey()).build()); ReplicationPeerConfig.newBuilder().setClusterKey(TEST_UTIL_PEER.getClusterKey()).build());
// Wait for replication sources to come up // Wait for replication sources to come up
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() { Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
@Override public boolean evaluate() throws Exception { @Override public boolean evaluate() {
return !(managerA.getSources().isEmpty() || managerB.getSources().isEmpty()); return !(managerA.getSources().isEmpty() || managerB.getSources().isEmpty());
} }
}); });
@ -257,17 +324,11 @@ public class TestReplicationSource {
// 1. The serverB's normal queue // 1. The serverB's normal queue
// 2. serverA's recovered queue on serverB // 2. serverA's recovered queue on serverB
cluster.stopRegionServer(serverB.getServerName()); cluster.stopRegionServer(serverB.getServerName());
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() { Waiter.waitFor(conf, 20000,
@Override public boolean evaluate() throws Exception { (Waiter.Predicate<Exception>) () -> managerC.getOldSources().size() == 2);
return managerC.getOldSources().size() == 2;
}
});
admin.enableReplicationPeer(peerId); admin.enableReplicationPeer(peerId);
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() { Waiter.waitFor(conf, 20000,
@Override public boolean evaluate() throws Exception { (Waiter.Predicate<Exception>) () -> managerC.getOldSources().size() == 0);
return managerC.getOldSources().size() == 0;
}
});
} finally { } finally {
conf.set(HConstants.REGION_SERVER_IMPL, HRegionServer.class.getName()); conf.set(HConstants.REGION_SERVER_IMPL, HRegionServer.class.getName());
} }
@ -277,7 +338,7 @@ public class TestReplicationSource {
* Regionserver implementation that adds a delay on the graceful shutdown. * Regionserver implementation that adds a delay on the graceful shutdown.
*/ */
public static class ShutdownDelayRegionServer extends HRegionServer { public static class ShutdownDelayRegionServer extends HRegionServer {
public ShutdownDelayRegionServer(Configuration conf) throws IOException, InterruptedException { public ShutdownDelayRegionServer(Configuration conf) throws IOException {
super(conf); super(conf);
} }
@ -295,7 +356,39 @@ public class TestReplicationSource {
} }
} }
// Test HBASE-20497 /**
* Deadend Endpoint. Does nothing.
*/
public static class DoNothingReplicationEndpoint extends HBaseInterClusterReplicationEndpoint {
private final UUID uuid = UUID.randomUUID();
@Override public void init(Context context) throws IOException {
this.ctx = context;
}
@Override public WALEntryFilter getWALEntryfilter() {
return null;
}
@Override public synchronized UUID getPeerUUID() {
return this.uuid;
}
@Override
protected void doStart() {
notifyStarted();
}
@Override
protected void doStop() {
notifyStopped();
}
}
/**
* Test HBASE-20497
* Moved here from TestReplicationSource because doesn't need cluster.
*/
@Test @Test
public void testRecoveredReplicationSourceShipperGetPosition() throws Exception { public void testRecoveredReplicationSourceShipperGetPosition() throws Exception {
String walGroupId = "fake-wal-group-id"; String walGroupId = "fake-wal-group-id";
@ -310,14 +403,14 @@ public class TestReplicationSource {
Mockito.when(source.getServerWALsBelongTo()).thenReturn(deadServer); Mockito.when(source.getServerWALsBelongTo()).thenReturn(deadServer);
ReplicationQueueStorage storage = Mockito.mock(ReplicationQueueStorage.class); ReplicationQueueStorage storage = Mockito.mock(ReplicationQueueStorage.class);
Mockito.when(storage.getWALPosition(Mockito.eq(serverName), Mockito.any(), Mockito.any())) Mockito.when(storage.getWALPosition(Mockito.eq(serverName), Mockito.any(), Mockito.any()))
.thenReturn(1001L); .thenReturn(1001L);
Mockito.when(storage.getWALPosition(Mockito.eq(deadServer), Mockito.any(), Mockito.any())) Mockito.when(storage.getWALPosition(Mockito.eq(deadServer), Mockito.any(), Mockito.any()))
.thenReturn(-1L); .thenReturn(-1L);
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
conf.setInt("replication.source.maxretriesmultiplier", -1); conf.setInt("replication.source.maxretriesmultiplier", -1);
RecoveredReplicationSourceShipper shipper = RecoveredReplicationSourceShipper shipper =
new RecoveredReplicationSourceShipper(conf, walGroupId, queue, source, storage); new RecoveredReplicationSourceShipper(conf, walGroupId, queue, source, storage);
Assert.assertEquals(1001L, shipper.getStartPosition()); assertEquals(1001L, shipper.getStartPosition());
conf.unset("replication.source.maxretriesmultiplier");
} }
} }