From dedb630da04305e5b90dacf8406fc2a92be2882e Mon Sep 17 00:00:00 2001 From: Wellington Ramos Chevreuil Date: Thu, 1 Oct 2020 08:03:46 +0100 Subject: [PATCH] =?UTF-8?q?HBASE-24813=20ReplicationSource=20should=20clea?= =?UTF-8?q?r=20buffer=20usage=20on=20Replicatio=E2=80=A6=20(#2453)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Sean Busbey --- .../regionserver/ReplicationSource.java | 15 +++--- .../ReplicationSourceShipper.java | 52 +++++++++++++++++++ .../ReplicationSourceWALReader.java | 3 +- .../regionserver/TestReplicationSource.java | 50 ++++++++++++++++-- 4 files changed, 109 insertions(+), 11 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 0b7ed16cc96..f94c9c64fd0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -627,14 +627,12 @@ public class ReplicationSource implements ReplicationSourceInterface { Threads.shutdown(initThread, this.sleepForRetries); } Collection workers = workerThreads.values(); - for (ReplicationSourceShipper worker : workers) { - worker.stopWorker(); - if(worker.entryReader != null) { - worker.entryReader.setReaderRunning(false); - } - } for (ReplicationSourceShipper worker : workers) { + worker.stopWorker(); + if (worker.entryReader != null) { + worker.entryReader.setReaderRunning(false); + } if (worker.isAlive() || worker.entryReader.isAlive()) { try { // Wait worker to stop @@ -652,6 +650,9 @@ public class ReplicationSource implements ReplicationSourceInterface { worker.entryReader.interrupt(); } } + //If worker is already stopped but there was still entries batched, + //we need to clear buffer used for non processed entries + worker.clearWALEntryBatch(); } if (this.replicationEndpoint != null) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index 92646d2cac4..3a23c67dd97 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -21,12 +21,15 @@ import static org.apache.hadoop.hbase.replication.ReplicationUtils.getAdaptiveTi import java.io.IOException; import java.util.List; import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.atomic.LongAccumulator; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALEdit; @@ -331,4 +334,53 @@ public class ReplicationSourceShipper extends Thread { } return sleepMultiplier < maxRetriesMultiplier; } + + /** + * Attempts to properly update ReplicationSourceManager.totalBufferUser, + * in case there were unprocessed entries batched by the reader to the shipper, + * but the shipper didn't manage to ship those because the replication source is being terminated. + * In that case, it iterates through the batched entries and decrease the pending + * entries size from ReplicationSourceManager.totalBufferUser + *

+ * NOTES + * 1) This method should only be called upon replication source termination. + * It blocks waiting for both shipper and reader threads termination, + * to make sure no race conditions + * when updating ReplicationSourceManager.totalBufferUser. + * + * 2) It does not attempt to terminate reader and shipper threads. Those must + * have been triggered interruption/termination prior to calling this method. + */ + void clearWALEntryBatch() { + long timeout = EnvironmentEdgeManager.currentTime() + this.shipEditsTimeout; + while(this.isAlive() || this.entryReader.isAlive()){ + try { + if (System.currentTimeMillis() >= timeout) { + LOG.warn("Interrupting source thread for peer {} without cleaning buffer usage " + + "because clearWALEntryBatch method timed out whilst waiting reader/shipper " + + "thread to stop.", this.source.getPeerId()); + Thread.currentThread().interrupt(); + } else { + // Wait both shipper and reader threads to stop + Thread.sleep(this.sleepForRetries); + } + } catch (InterruptedException e) { + LOG.warn("{} Interrupted while waiting {} to stop on clearWALEntryBatch: {}", + this.source.getPeerId(), this.getName(), e); + Thread.currentThread().interrupt(); + } + } + LongAccumulator totalToDecrement = new LongAccumulator((a,b) -> a + b, 0); + entryReader.entryBatchQueue.forEach(w -> { + entryReader.entryBatchQueue.remove(w); + w.getWalEntries().forEach(e -> { + long entrySizeExcludeBulkLoad = entryReader.getEntrySizeExcludeBulkLoad(e); + totalToDecrement.accumulate(entrySizeExcludeBulkLoad); + }); + }); + + LOG.trace("Decrementing totalBufferUsed by {}B while stopping Replication WAL Readers.", + totalToDecrement.longValue()); + source.getSourceManager().getTotalBufferUsed().addAndGet(-totalToDecrement.longValue()); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index dd5b4dc3008..1ff0a8b1b7b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -60,7 +60,8 @@ class ReplicationSourceWALReader extends Thread { private final WALEntryFilter filter; private final ReplicationSource source; - private final BlockingQueue entryBatchQueue; + @InterfaceAudience.Private + final BlockingQueue entryBatchQueue; // max (heap) size of each batch - multiply by number of batches in queue to get total private final long replicationBatchSizeCapacity; // max count of each batch - multiply by number of batches in queue to get total diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java index 15f202f0646..3cedd7f9126 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java @@ -21,7 +21,10 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.io.IOException; +import java.util.ArrayList; import java.util.OptionalLong; import java.util.UUID; import java.util.concurrent.ExecutorService; @@ -268,6 +271,47 @@ public class TestReplicationSource { } } + @Test + public void testTerminateClearsBuffer() throws Exception { + ReplicationSource source = new ReplicationSource(); + ReplicationSourceManager mockManager = mock(ReplicationSourceManager.class); + MetricsReplicationGlobalSourceSource mockMetrics = + mock(MetricsReplicationGlobalSourceSource.class); + AtomicLong buffer = new AtomicLong(); + Mockito.when(mockManager.getTotalBufferUsed()).thenReturn(buffer); + Mockito.when(mockManager.getGlobalMetrics()).thenReturn(mockMetrics); + ReplicationPeer mockPeer = mock(ReplicationPeer.class); + Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); + Configuration testConf = HBaseConfiguration.create(); + source.init(testConf, null, mockManager, null, mockPeer, null, + "testPeer", null, p -> OptionalLong.empty(), mock(MetricsSource.class)); + ReplicationSourceWALReader reader = new ReplicationSourceWALReader(null, + conf, null, 0, null, source); + ReplicationSourceShipper shipper = + new ReplicationSourceShipper(conf, null, null, source); + shipper.entryReader = reader; + source.workerThreads.put("testPeer", shipper); + WALEntryBatch batch = new WALEntryBatch(10, logDir); + WAL.Entry mockEntry = mock(WAL.Entry.class); + WALEdit mockEdit = mock(WALEdit.class); + WALKeyImpl mockKey = mock(WALKeyImpl.class); + when(mockEntry.getEdit()).thenReturn(mockEdit); + when(mockEdit.isEmpty()).thenReturn(false); + when(mockEntry.getKey()).thenReturn(mockKey); + when(mockKey.estimatedSerializedSizeOf()).thenReturn(1000L); + when(mockEdit.heapSize()).thenReturn(10000L); + when(mockEdit.size()).thenReturn(0); + ArrayList cells = new ArrayList<>(); + KeyValue kv = new KeyValue(Bytes.toBytes("0001"), Bytes.toBytes("f"), + Bytes.toBytes("1"), Bytes.toBytes("v1")); + cells.add(kv); + when(mockEdit.getCells()).thenReturn(cells); + reader.addEntryToBatch(batch, mockEntry); + reader.entryBatchQueue.put(batch); + source.terminate("test"); + assertEquals(0, source.getSourceManager().getTotalBufferUsed().get()); + } + /** * Tests that recovered queues are preserved on a regionserver shutdown. * See HBASE-18192 @@ -407,12 +451,12 @@ public class TestReplicationSource { ServerName deadServer = ServerName.valueOf("www.deadServer.com", 12006, 1524679704419L); PriorityBlockingQueue queue = new PriorityBlockingQueue<>(); queue.put(new Path("/www/html/test")); - RecoveredReplicationSource source = Mockito.mock(RecoveredReplicationSource.class); - Server server = Mockito.mock(Server.class); + RecoveredReplicationSource source = mock(RecoveredReplicationSource.class); + Server server = mock(Server.class); Mockito.when(server.getServerName()).thenReturn(serverName); Mockito.when(source.getServer()).thenReturn(server); Mockito.when(source.getServerWALsBelongTo()).thenReturn(deadServer); - ReplicationQueueStorage storage = Mockito.mock(ReplicationQueueStorage.class); + ReplicationQueueStorage storage = mock(ReplicationQueueStorage.class); Mockito.when(storage.getWALPosition(Mockito.eq(serverName), Mockito.any(), Mockito.any())) .thenReturn(1001L); Mockito.when(storage.getWALPosition(Mockito.eq(deadServer), Mockito.any(), Mockito.any()))