From f73986d11045902e82668408b09d0488aef9d1c0 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Sat, 14 Apr 2018 22:00:15 +0800 Subject: [PATCH] HBASE-20417 Do not read wal entries when peer is disabled --- .../ReplicationSourceShipper.java | 15 ++--- .../ReplicationSourceWALReader.java | 4 ++ .../regionserver/TestWALEntryStream.java | 59 ++++++++++++++++++- 3 files changed, 68 insertions(+), 10 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index 2097d00817d..11fd660c2b6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -87,12 +87,11 @@ public class ReplicationSourceShipper extends Thread { setWorkerState(WorkerState.RUNNING); // Loop until we close down while (isActive()) { - int sleepMultiplier = 1; // Sleep until replication is enabled again if (!source.isPeerEnabled()) { - if (sleepForRetries("Replication is disabled", sleepMultiplier)) { - sleepMultiplier++; - } + // The peer enabled check is in memory, not expensive, so do not need to increase the + // sleep interval as it may cause a long lag when we enable the peer. + sleepForRetries("Replication is disabled", 1); continue; } try { @@ -188,8 +187,8 @@ public class ReplicationSourceShipper extends Thread { } break; } catch (Exception ex) { - LOG.warn(source.getReplicationEndpoint().getClass().getName() + " threw unknown exception:" - + org.apache.hadoop.util.StringUtils.stringifyException(ex)); + LOG.warn("{} threw unknown exception:", + source.getReplicationEndpoint().getClass().getName(), ex); if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) { sleepMultiplier++; } @@ -292,9 +291,7 @@ public class ReplicationSourceShipper extends Thread { */ public boolean sleepForRetries(String msg, int sleepMultiplier) { try { - if (LOG.isTraceEnabled()) { - LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier); - } + LOG.trace("{}, sleeping {} times {}", msg, sleepForRetries, sleepMultiplier); Thread.sleep(this.sleepForRetries * sleepMultiplier); } catch (InterruptedException e) { LOG.debug("Interrupted while sleeping between retries"); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index 7ba347feb6c..64fd48df900 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -126,6 +126,10 @@ class ReplicationSourceWALReader extends Thread { source.getWALFileLengthProvider(), source.getServerWALsBelongTo(), source.getSourceMetrics())) { while (isReaderRunning()) { // loop here to keep reusing stream while we can + if (!source.isPeerEnabled()) { + Threads.sleep(sleepForRetries); + continue; + } if (!checkQuota()) { continue; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java index 2670756ef1d..35e4f8208b4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java @@ -30,7 +30,12 @@ import java.io.IOException; import java.util.NavigableMap; import java.util.OptionalLong; import java.util.TreeMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.Future; import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -328,7 +333,7 @@ public class TestWALEntryStream { } } - private ReplicationSourceWALReader createReader(boolean recovered, Configuration conf) { + private ReplicationSource mockReplicationSource(boolean recovered, Configuration conf) { ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class); when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); Server mockServer = Mockito.mock(Server.class); @@ -338,6 +343,12 @@ public class TestWALEntryStream { when(source.getWALFileLengthProvider()).thenReturn(log); when(source.getServer()).thenReturn(mockServer); when(source.isRecovered()).thenReturn(recovered); + return source; + } + + private ReplicationSourceWALReader createReader(boolean recovered, Configuration conf) { + ReplicationSource source = mockReplicationSource(recovered, conf); + when(source.isPeerEnabled()).thenReturn(true); ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, conf, walQueue, 0, getDummyFilter(), source); reader.start(); @@ -460,6 +471,52 @@ public class TestWALEntryStream { assertFalse(entryBatch.isEndOfFile()); } + @Test + public void testReplicationSourceWALReaderDisabled() + throws IOException, InterruptedException, ExecutionException { + appendEntriesToLogAndSync(3); + // get ending position + long position; + try (WALEntryStream entryStream = + new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) { + entryStream.next(); + entryStream.next(); + entryStream.next(); + position = entryStream.getPosition(); + } + + // start up a reader + Path walPath = walQueue.peek(); + ReplicationSource source = mockReplicationSource(false, CONF); + AtomicInteger invokeCount = new AtomicInteger(0); + AtomicBoolean enabled = new AtomicBoolean(false); + when(source.isPeerEnabled()).then(i -> { + invokeCount.incrementAndGet(); + return enabled.get(); + }); + + ReplicationSourceWALReader reader = + new ReplicationSourceWALReader(fs, CONF, walQueue, 0, getDummyFilter(), source); + reader.start(); + Future future = ForkJoinPool.commonPool().submit(() -> { + return reader.take(); + }); + // make sure that the isPeerEnabled has been called several times + TEST_UTIL.waitFor(30000, () -> invokeCount.get() >= 5); + // confirm that we can read nothing if the peer is disabled + assertFalse(future.isDone()); + // then enable the peer, we should get the batch + enabled.set(true); + WALEntryBatch entryBatch = future.get(); + + // should've batched up our entries + assertNotNull(entryBatch); + assertEquals(3, entryBatch.getWalEntries().size()); + assertEquals(position, entryBatch.getLastWalPosition()); + assertEquals(walPath, entryBatch.getLastWalPath()); + assertEquals(3, entryBatch.getNbRowKeys()); + } + private String getRow(WAL.Entry entry) { Cell cell = entry.getEdit().getCells().get(0); return Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());