diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/SystemTableWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/SystemTableWALEntryFilter.java index 1984436019a..3cda94a1c02 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/SystemTableWALEntryFilter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/SystemTableWALEntryFilter.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,20 +17,16 @@ */ package org.apache.hadoop.hbase.replication; - -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.yetus.audience.InterfaceAudience; /** - * Skips WAL edits for all System tables including META + * Skips WAL edits for all System tables including hbase:meta. */ @InterfaceAudience.Private public class SystemTableWALEntryFilter implements WALEntryFilter { @Override public Entry filter(Entry entry) { - if (entry.getKey().getTableName().isSystemTable()) { - return null; - } - return entry; + return entry.getKey().getTableName().isSystemTable()? null: entry; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java index cd3f1bdfb06..23c1c60f2db 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -16,10 +16,9 @@ * limitations under the License. */ package org.apache.hadoop.hbase.replication; - -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.yetus.audience.InterfaceAudience; /** * A Filter for WAL entries before being sent over to replication. Multiple @@ -34,7 +33,6 @@ import org.apache.hadoop.hbase.wal.WAL.Entry; */ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) public interface WALEntryFilter { - /** *

* Applies the filter, possibly returning a different Entry instance. If null is returned, the @@ -49,5 +47,5 @@ public interface WALEntryFilter { * @return a (possibly modified) Entry to use. Returning null or an entry with no cells will cause * the entry to be skipped for replication. */ - public Entry filter(Entry entry); + Entry filter(Entry entry); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index bc1754904b8..1d9269d4abc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,12 +18,12 @@ package org.apache.hadoop.hbase.replication.regionserver; import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getArchivedLogPath; - import java.io.FileNotFoundException; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.List; @@ -36,6 +36,7 @@ import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Predicate; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -63,16 +64,15 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; /** * Class that handles the source of a replication stream. - * Currently does not handle more than 1 slave + * Currently does not handle more than 1 slave cluster. * For each slave cluster it selects a random number of peers * using a replication ratio. For example, if replication ration = 0.1 * and slave cluster has 100 region servers, 10 will be selected. @@ -119,8 +119,12 @@ public class ReplicationSource implements ReplicationSourceInterface { private int logQueueWarnThreshold; // ReplicationEndpoint which will handle the actual replication private volatile ReplicationEndpoint replicationEndpoint; - // A filter (or a chain of filters) for the WAL entries. + + /** + * A filter (or a chain of filters) for WAL entries; filters out edits. + */ protected volatile WALEntryFilter walEntryFilter; + // throttler private ReplicationThrottler throttler; private long defaultBandwidth; @@ -139,6 +143,39 @@ public class ReplicationSource implements ReplicationSourceInterface { private Thread initThread; + /** + * WALs to replicate. + * Predicate that returns 'true' for WALs to replicate and false for WALs to skip. + */ + private final Predicate filterInWALs; + + /** + * Base WALEntry filters for this class. Unmodifiable. Set on construction. + * Filters *out* edits we do not want replicated, passed on to replication endpoints. + * This is the basic set. Down in #initializeWALEntryFilter this set is added to the end of + * the WALEntry filter chain. These are put after those that we pick up from the configured + * endpoints and other machinations to create the final {@link #walEntryFilter}. + * @see WALEntryFilter + */ + private final List baseFilterOutWALEntries; + + ReplicationSource() { + // Default, filters *in* all WALs but meta WALs & filters *out* all WALEntries of System Tables. + this(p -> !AbstractFSWALProvider.isMetaFile(p), + Lists.newArrayList(new SystemTableWALEntryFilter())); + } + + /** + * @param replicateWAL Pass a filter to run against WAL Path; filter *in* WALs to Replicate; + * i.e. return 'true' if you want to replicate the content of the WAL. + * @param baseFilterOutWALEntries Base set of filters you want applied always; filters *out* + * WALEntries so they never make it out of this ReplicationSource. + */ + ReplicationSource(Predicate replicateWAL, List baseFilterOutWALEntries) { + this.filterInWALs = replicateWAL; + this.baseFilterOutWALEntries = Collections.unmodifiableList(baseFilterOutWALEntries); + } + /** * Instantiation method used by region servers * @param conf configuration to use @@ -192,30 +229,34 @@ public class ReplicationSource implements ReplicationSourceInterface { } @Override - public void enqueueLog(Path log) { - String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log.getName()); + public void enqueueLog(Path wal) { + if (!this.filterInWALs.test(wal)) { + LOG.trace("NOT replicating {}", wal); + return; + } + String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal.getName()); PriorityBlockingQueue queue = queues.get(logPrefix); if (queue == null) { queue = new PriorityBlockingQueue<>(queueSizePerGroup, new LogsComparator()); // make sure that we do not use an empty queue when setting up a ReplicationSource, otherwise // the shipper may quit immediately - queue.put(log); + queue.put(wal); queues.put(logPrefix, queue); if (this.isSourceActive() && this.walEntryFilter != null) { // new wal group observed after source startup, start a new worker thread to track it - // notice: it's possible that log enqueued when this.running is set but worker thread + // notice: it's possible that wal enqueued when this.running is set but worker thread // still not launched, so it's necessary to check workerThreads before start the worker tryStartNewShipper(logPrefix, queue); } } else { - queue.put(log); + queue.put(wal); } if (LOG.isTraceEnabled()) { - LOG.trace("{} Added log file {} to queue of source {}.", logPeerId(), logPrefix, + LOG.trace("{} Added wal {} to queue of source {}.", logPeerId(), logPrefix, this.replicationQueueInfo.getQueueId()); } this.metrics.incrSizeOfLogQueue(); - // This will log a warning for each new log that gets created above the warn threshold + // This will wal a warning for each new wal that gets created above the warn threshold int queueSize = queue.size(); if (queueSize > this.logQueueWarnThreshold) { LOG.warn("{} WAL group {} queue size: {} exceeds value of " @@ -304,8 +345,7 @@ public class ReplicationSource implements ReplicationSourceInterface { private void initializeWALEntryFilter(UUID peerClusterId) { // get the WALEntryFilter from ReplicationEndpoint and add it to default filters - ArrayList filters = - Lists. newArrayList(new SystemTableWALEntryFilter()); + List filters = new ArrayList<>(this.baseFilterOutWALEntries); WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter(); if (filterFromEndpoint != null) { filters.add(filterFromEndpoint); @@ -400,6 +440,16 @@ public class ReplicationSource implements ReplicationSourceInterface { : new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this); } + /** + * Call after {@link #initializeWALEntryFilter(UUID)} else it will be null. + * @return The WAL Entry Filter Chain this ReplicationSource will use on WAL files filtering + * out WALEntry edits. + */ + @VisibleForTesting + WALEntryFilter getWalEntryFilter() { + return walEntryFilter; + } + protected final void uncaughtException(Thread t, Throwable e) { RSRpcServices.exitIfOOME(e); LOG.error("Unexpected exception in {} currentPath={}", @@ -622,7 +672,10 @@ public class ReplicationSource implements ReplicationSourceInterface { } } if (clearMetrics) { - this.metrics.clear(); + // Can be null in test context. + if (this.metrics != null) { + this.metrics.clear(); + } } } @@ -647,10 +700,6 @@ public class ReplicationSource implements ReplicationSourceInterface { return !this.server.isStopped() && this.sourceRunning; } - public UUID getPeerClusterUUID(){ - return this.clusterId; - } - /** * Comparator used to compare logs together based on their start time */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java index 10db1322a06..94ae7046779 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -45,7 +45,6 @@ import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; @@ -307,7 +306,6 @@ public abstract class AbstractFSWALProvider> implemen * Construct the directory name for all old WALs on a given server. The default old WALs dir looks * like: hbase/oldWALs. If you config hbase.separate.oldlogdir.by.regionserver to * true, it looks like hbase//oldWALs/kalashnikov.att.net,61634,1486865297088. - * @param conf * @param serverName Server name formatted as described in {@link ServerName} * @return the relative WAL directory name */ @@ -413,11 +411,11 @@ public abstract class AbstractFSWALProvider> implemen return isMetaFile(p.getName()); } + /** + * @return True if String ends in {@link #META_WAL_PROVIDER_ID} + */ public static boolean isMetaFile(String p) { - if (p != null && p.endsWith(META_WAL_PROVIDER_ID)) { - return true; - } - return false; + return p != null && p.endsWith(META_WAL_PROVIDER_ID); } public static boolean isArchivedLogFile(Path p) { @@ -460,12 +458,9 @@ public abstract class AbstractFSWALProvider> implemen * @param path path to WAL file * @param conf configuration * @return WAL Reader instance - * @throws IOException */ public static org.apache.hadoop.hbase.wal.WAL.Reader openReader(Path path, Configuration conf) - throws IOException - - { + throws IOException { long retryInterval = 2000; // 2 sec int maxAttempts = 30; int attempt = 0; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java index 274ccabfbea..6323da3e3e9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -16,13 +16,14 @@ * limitations under the License. */ package org.apache.hadoop.hbase.replication.regionserver; - +import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.META_WAL_PROVIDER_ID; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; - +import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.OptionalLong; +import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -41,19 +42,14 @@ import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter; -import org.apache.hadoop.hbase.Waiter.Predicate; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; -import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; -import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource; -import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSourceShipper; -import org.apache.hadoop.hbase.replication.regionserver.Replication; -import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource; -import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager; +import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; @@ -63,7 +59,6 @@ import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WALProvider; import org.junit.AfterClass; -import org.junit.Assert; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; @@ -96,9 +91,13 @@ public class TestReplicationSource { FS = TEST_UTIL.getDFSCluster().getFileSystem(); Path rootDir = TEST_UTIL.createRootDir(); oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); - if (FS.exists(oldLogDir)) FS.delete(oldLogDir, true); + if (FS.exists(oldLogDir)) { + FS.delete(oldLogDir, true); + } logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME); - if (FS.exists(logDir)) FS.delete(logDir, true); + if (FS.exists(logDir)) { + FS.delete(logDir, true); + } } @AfterClass @@ -108,16 +107,100 @@ public class TestReplicationSource { TEST_UTIL.shutdownMiniDFSCluster(); } + /** + * Test the default ReplicationSource skips queuing hbase:meta WAL files. + */ + @Test + public void testDefaultSkipsMetaWAL() throws IOException { + ReplicationSource rs = new ReplicationSource(); + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + conf.setInt("replication.source.maxretriesmultiplier", 1); + ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); + Mockito.when(mockPeer.getConfiguration()).thenReturn(conf); + Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); + ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class); + Mockito.when(peerConfig.getReplicationEndpointImpl()). + thenReturn(DoNothingReplicationEndpoint.class.getName()); + Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig); + ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); + Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); + String queueId = "qid"; + RegionServerServices rss = + TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); + rs.init(conf, null, manager, null, mockPeer, rss, queueId, null, + p -> OptionalLong.empty(), new MetricsSource(queueId)); + try { + rs.startup(); + assertTrue(rs.isSourceActive()); + assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue()); + rs.enqueueLog(new Path("a.1" + META_WAL_PROVIDER_ID)); + assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue()); + rs.enqueueLog(new Path("a.1")); + assertEquals(1, rs.getSourceMetrics().getSizeOfLogQueue()); + } finally { + rs.terminate("Done"); + rss.stop("Done"); + } + } + + /** + * Test that we filter out meta edits, etc. + */ + @Test + public void testWALEntryFilter() throws IOException { + // To get the fully constructed default WALEntryFilter, need to create a ReplicationSource + // instance and init it. + ReplicationSource rs = new ReplicationSource(); + UUID uuid = UUID.randomUUID(); + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); + Mockito.when(mockPeer.getConfiguration()).thenReturn(conf); + Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); + ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class); + Mockito.when(peerConfig.getReplicationEndpointImpl()). + thenReturn(DoNothingReplicationEndpoint.class.getName()); + Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig); + ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); + Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); + String queueId = "qid"; + RegionServerServices rss = + TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); + rs.init(conf, null, manager, null, mockPeer, rss, queueId, + uuid, p -> OptionalLong.empty(), new MetricsSource(queueId)); + try { + rs.startup(); + TEST_UTIL.waitFor(30000, () -> rs.getWalEntryFilter() != null); + WALEntryFilter wef = rs.getWalEntryFilter(); + // Test non-system WAL edit. + WAL.Entry e = new WAL.Entry(new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, + TableName.valueOf("test"), -1), new WALEdit()); + assertTrue(wef.filter(e) == e); + // Test system WAL edit. + e = new WAL.Entry( + new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TableName.META_TABLE_NAME, -1), + new WALEdit()); + assertNull(wef.filter(e)); + } finally { + rs.terminate("Done"); + rss.stop("Done"); + } + } + /** * Sanity check that we can move logs around while we are reading * from them. Should this test fail, ReplicationSource would have a hard * time reading logs that are being archived. */ + // This tests doesn't belong in here... it is not about ReplicationSource. @Test public void testLogMoving() throws Exception{ Path logPath = new Path(logDir, "log"); - if (!FS.exists(logDir)) FS.mkdirs(logDir); - if (!FS.exists(oldLogDir)) FS.mkdirs(oldLogDir); + if (!FS.exists(logDir)) { + FS.mkdirs(logDir); + } + if (!FS.exists(oldLogDir)) { + FS.mkdirs(oldLogDir); + } WALProvider.Writer writer = WALFactory.createWALWriter(FS, logPath, TEST_UTIL.getConfiguration()); for(int i = 0; i < 3; i++) { @@ -142,7 +225,7 @@ public class TestReplicationSource { entry = reader.next(); assertNotNull(entry); - entry = reader.next(); + reader.next(); entry = reader.next(); assertNull(entry); @@ -151,47 +234,31 @@ public class TestReplicationSource { /** * Tests that {@link ReplicationSource#terminate(String)} will timeout properly + * Moved here from TestReplicationSource because doesn't need cluster. */ @Test public void testTerminateTimeout() throws Exception { ReplicationSource source = new ReplicationSource(); - ReplicationEndpoint replicationEndpoint = new HBaseInterClusterReplicationEndpoint() { - @Override - protected void doStart() { - notifyStarted(); - } - - @Override - protected void doStop() { - // not calling notifyStopped() here causes the caller of stop() to get a Future that never - // completes - } - }; - replicationEndpoint.start(); - ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); - Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); - Configuration testConf = HBaseConfiguration.create(); - testConf.setInt("replication.source.maxretriesmultiplier", 1); - ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); - Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); - source.init(testConf, null, manager, null, mockPeer, null, "testPeer", null, - p -> OptionalLong.empty(), null); - ExecutorService executor = Executors.newSingleThreadExecutor(); - Future future = executor.submit(new Runnable() { - - @Override - public void run() { - source.terminate("testing source termination"); - } - }); - long sleepForRetries = testConf.getLong("replication.source.sleepforretries", 1000); - Waiter.waitFor(testConf, sleepForRetries * 2, new Predicate() { - - @Override - public boolean evaluate() throws Exception { - return future.isDone(); - } - }); + ReplicationEndpoint + replicationEndpoint = new DoNothingReplicationEndpoint(); + try { + replicationEndpoint.start(); + ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); + Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); + Configuration testConf = HBaseConfiguration.create(); + testConf.setInt("replication.source.maxretriesmultiplier", 1); + ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); + Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); + source.init(testConf, null, manager, null, mockPeer, null, "testPeer", + null, p -> OptionalLong.empty(), null); + ExecutorService executor = Executors.newSingleThreadExecutor(); + Future future = executor.submit( + () -> source.terminate("testing source termination")); + long sleepForRetries = testConf.getLong("replication.source.sleepforretries", 1000); + Waiter.waitFor(testConf, sleepForRetries * 2, (Waiter.Predicate) future::isDone); + } finally { + replicationEndpoint.stop(); + } } /** @@ -211,10 +278,10 @@ public class TestReplicationSource { HRegionServer serverA = cluster.getRegionServer(0); final ReplicationSourceManager managerA = - ((Replication) serverA.getReplicationSourceService()).getReplicationManager(); + serverA.getReplicationSourceService().getReplicationManager(); HRegionServer serverB = cluster.getRegionServer(1); final ReplicationSourceManager managerB = - ((Replication) serverB.getReplicationSourceService()).getReplicationManager(); + serverB.getReplicationSourceService().getReplicationManager(); final Admin admin = TEST_UTIL.getAdmin(); final String peerId = "TestPeer"; @@ -222,7 +289,7 @@ public class TestReplicationSource { ReplicationPeerConfig.newBuilder().setClusterKey(TEST_UTIL_PEER.getClusterKey()).build()); // Wait for replication sources to come up Waiter.waitFor(conf, 20000, new Waiter.Predicate() { - @Override public boolean evaluate() throws Exception { + @Override public boolean evaluate() { return !(managerA.getSources().isEmpty() || managerB.getSources().isEmpty()); } }); @@ -257,17 +324,11 @@ public class TestReplicationSource { // 1. The serverB's normal queue // 2. serverA's recovered queue on serverB cluster.stopRegionServer(serverB.getServerName()); - Waiter.waitFor(conf, 20000, new Waiter.Predicate() { - @Override public boolean evaluate() throws Exception { - return managerC.getOldSources().size() == 2; - } - }); + Waiter.waitFor(conf, 20000, + (Waiter.Predicate) () -> managerC.getOldSources().size() == 2); admin.enableReplicationPeer(peerId); - Waiter.waitFor(conf, 20000, new Waiter.Predicate() { - @Override public boolean evaluate() throws Exception { - return managerC.getOldSources().size() == 0; - } - }); + Waiter.waitFor(conf, 20000, + (Waiter.Predicate) () -> managerC.getOldSources().size() == 0); } finally { conf.set(HConstants.REGION_SERVER_IMPL, HRegionServer.class.getName()); } @@ -277,7 +338,7 @@ public class TestReplicationSource { * Regionserver implementation that adds a delay on the graceful shutdown. */ public static class ShutdownDelayRegionServer extends HRegionServer { - public ShutdownDelayRegionServer(Configuration conf) throws IOException, InterruptedException { + public ShutdownDelayRegionServer(Configuration conf) throws IOException { super(conf); } @@ -295,7 +356,39 @@ public class TestReplicationSource { } } - // Test HBASE-20497 + /** + * Deadend Endpoint. Does nothing. + */ + public static class DoNothingReplicationEndpoint extends HBaseInterClusterReplicationEndpoint { + private final UUID uuid = UUID.randomUUID(); + + @Override public void init(Context context) throws IOException { + this.ctx = context; + } + + @Override public WALEntryFilter getWALEntryfilter() { + return null; + } + + @Override public synchronized UUID getPeerUUID() { + return this.uuid; + } + + @Override + protected void doStart() { + notifyStarted(); + } + + @Override + protected void doStop() { + notifyStopped(); + } + } + + /** + * Test HBASE-20497 + * Moved here from TestReplicationSource because doesn't need cluster. + */ @Test public void testRecoveredReplicationSourceShipperGetPosition() throws Exception { String walGroupId = "fake-wal-group-id"; @@ -310,14 +403,14 @@ public class TestReplicationSource { Mockito.when(source.getServerWALsBelongTo()).thenReturn(deadServer); ReplicationQueueStorage storage = Mockito.mock(ReplicationQueueStorage.class); Mockito.when(storage.getWALPosition(Mockito.eq(serverName), Mockito.any(), Mockito.any())) - .thenReturn(1001L); + .thenReturn(1001L); Mockito.when(storage.getWALPosition(Mockito.eq(deadServer), Mockito.any(), Mockito.any())) - .thenReturn(-1L); + .thenReturn(-1L); + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); conf.setInt("replication.source.maxretriesmultiplier", -1); RecoveredReplicationSourceShipper shipper = - new RecoveredReplicationSourceShipper(conf, walGroupId, queue, source, storage); - Assert.assertEquals(1001L, shipper.getStartPosition()); - conf.unset("replication.source.maxretriesmultiplier"); + new RecoveredReplicationSourceShipper(conf, walGroupId, queue, source, storage); + assertEquals(1001L, shipper.getStartPosition()); } }