diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 144f358a354..69a3a51c5ad 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -858,7 +858,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint)); ChainWALEntryFilter readerFilter = new ChainWALEntryFilter(filters); entryReader = new ReplicationSourceWALReaderThread(manager, replicationQueueInfo, queue, - startPosition, fs, conf, readerFilter, metrics); + startPosition, fs, conf, readerFilter, metrics, ReplicationSource.this); Threads.setDaemonThreadRunning(entryReader, threadName + ".replicationSource.replicationWALReaderThread." + walGroupId + "," + peerClusterZnode, handler); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java index f795db9404d..e3c4f87f1f6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java @@ -80,6 +80,8 @@ public class ReplicationSourceWALReaderThread extends Thread { private AtomicLong totalBufferUsed; private long totalBufferQuota; + private ReplicationSource source; + /** * Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the * entries, and puts them on a batch queue. @@ -94,8 +96,8 @@ public class ReplicationSourceWALReaderThread extends Thread { */ public ReplicationSourceWALReaderThread(ReplicationSourceManager manager, ReplicationQueueInfo replicationQueueInfo, PriorityBlockingQueue logQueue, - long startPosition, - FileSystem fs, Configuration conf, WALEntryFilter filter, MetricsSource metrics) { + long startPosition, FileSystem fs, Configuration conf, WALEntryFilter filter, + MetricsSource metrics, ReplicationSource source) { this.replicationQueueInfo = replicationQueueInfo; this.logQueue = logQueue; this.lastReadPath = logQueue.peek(); @@ -118,6 +120,7 @@ public class ReplicationSourceWALReaderThread extends Thread { this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per this.metrics = metrics; this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount); + this.source = source; LOG.info("peerClusterZnode=" + replicationQueueInfo.getPeerClusterZnode() + ", ReplicationSourceWALReaderThread : " + replicationQueueInfo.getPeerId() + " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity @@ -132,6 +135,10 @@ public class ReplicationSourceWALReaderThread extends Thread { try (WALEntryStream entryStream = new WALEntryStream(logQueue, fs, conf, lastReadPosition, metrics)) { while (isReaderRunning()) { // loop here to keep reusing stream while we can + if (!source.isPeerEnabled()) { + Threads.sleep(sleepForRetries); + continue; + } if (!checkQuota()) { continue; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java index 7ad7260c3c3..1828ad8f9cf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java @@ -27,6 +27,8 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.IOException; @@ -38,7 +40,12 @@ import java.util.Map; import java.util.NavigableMap; import java.util.NoSuchElementException; import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; @@ -81,7 +88,9 @@ import org.junit.experimental.categories.Category; import org.junit.rules.TestName; import org.junit.runner.RunWith; import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; @RunWith(MockitoJUnitRunner.class) @Category({ ReplicationTests.class, LargeTests.class }) @@ -359,10 +368,12 @@ public class TestWALEntryStream { // start up a batcher ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class); + ReplicationSource source = Mockito.mock(ReplicationSource.class); + when(source.isPeerEnabled()).thenReturn(true); when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); ReplicationSourceWALReaderThread batcher = new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(),walQueue, 0, - fs, conf, getDummyFilter(), new MetricsSource("1")); + fs, conf, getDummyFilter(), new MetricsSource("1"), source); Path walPath = walQueue.peek(); batcher.start(); WALEntryBatch entryBatch = batcher.take(); @@ -398,10 +409,13 @@ public class TestWALEntryStream { } ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class); + ReplicationSource source = Mockito.mock(ReplicationSource.class); + when(source.isPeerEnabled()).thenReturn(true); when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); ReplicationSourceWALReaderThread reader = new ReplicationSourceWALReaderThread(mockSourceManager, getRecoveredQueueInfo(), - walQueue, 0, fs, conf, getDummyFilter(), new MetricsSource("1")); + walQueue, 0, fs, conf, getDummyFilter(), + new MetricsSource("1"), source); Path walPath = walQueue.toArray(new Path[2])[1]; reader.start(); WALEntryBatch entryBatch = reader.take(); @@ -456,10 +470,12 @@ public class TestWALEntryStream { appendEntriesToLog(2); ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class); + ReplicationSource source = Mockito.mock(ReplicationSource.class); + when(source.isPeerEnabled()).thenReturn(true); when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); final ReplicationSourceWALReaderThread reader = new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(), walQueue, - 0, fs, conf, filter, new MetricsSource("1")); + 0, fs, conf, filter, new MetricsSource("1"), source); reader.start(); WALEntryBatch entryBatch = reader.take(); @@ -490,10 +506,12 @@ public class TestWALEntryStream { final long eof = getPosition(firstWAL); ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class); + ReplicationSource source = Mockito.mock(ReplicationSource.class); + when(source.isPeerEnabled()).thenReturn(true); when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); final ReplicationSourceWALReaderThread reader = new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(), walQueue, - 0, fs, conf, filter, new MetricsSource("1")); + 0, fs, conf, filter, new MetricsSource("1"), source); reader.start(); // reader won't put any batch, even if EOF reached. @@ -613,4 +631,65 @@ public class TestWALEntryStream { currentPath = newPath; } } + + @Test + public void testReplicationSourceWALReaderDisabled() + throws IOException, InterruptedException, ExecutionException { + for(int i=0; i<3; i++) { + //append and sync + appendToLog("key" + i); + } + // get ending position + long position; + try (WALEntryStream entryStream = + new WALEntryStream(walQueue, fs, conf, 0, new MetricsSource("1"))) { + entryStream.next(); + entryStream.next(); + entryStream.next(); + position = entryStream.getPosition(); + } + + // start up a reader + Path walPath = walQueue.peek(); + ReplicationSource source = Mockito.mock(ReplicationSource.class); + when(source.getSourceMetrics()).thenReturn(new MetricsSource("1")); + + final AtomicBoolean enabled = new AtomicBoolean(false); + when(source.isPeerEnabled()).thenAnswer(new Answer() { + @Override + public Boolean answer(InvocationOnMock invocationOnMock) throws Throwable { + return enabled.get(); + } + }); + + ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class); + when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); + final ReplicationSourceWALReaderThread reader = + new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(), walQueue, + 0, fs, conf, getDummyFilter(), new MetricsSource("1"), source); + + reader.start(); + Future future = + Executors.newSingleThreadExecutor().submit(new Callable() { + @Override + public WALEntryBatch call() throws Exception { + return reader.take(); + } + }); + + // make sure that the isPeerEnabled has been called several times + verify(source, timeout(30000).atLeast(5)).isPeerEnabled(); + // confirm that we can read nothing if the peer is disabled + assertFalse(future.isDone()); + // then enable the peer, we should get the batch + enabled.set(true); + WALEntryBatch entryBatch = future.get(); + + // should've batched up our entries + assertNotNull(entryBatch); + assertEquals(3, entryBatch.getWalEntries().size()); + assertEquals(position, entryBatch.getLastWalPosition()); + assertEquals(walPath, entryBatch.getLastWalPath()); + assertEquals(3, entryBatch.getNbRowKeys()); + } }