diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index c87ca191138..fadccb741d6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -700,6 +700,7 @@ public class ReplicationSource implements ReplicationSourceInterface { Threads.shutdown(initThread, this.sleepForRetries); } Collection workers = workerThreads.values(); + for (ReplicationSourceShipper worker : workers) { worker.stopWorker(); if(worker.entryReader != null) { @@ -710,6 +711,7 @@ public class ReplicationSource implements ReplicationSourceInterface { if (this.replicationEndpoint != null) { this.replicationEndpoint.stop(); } + for (ReplicationSourceShipper worker : workers) { if (worker.isAlive() || worker.entryReader.isAlive()) { try { @@ -728,6 +730,9 @@ public class ReplicationSource implements ReplicationSourceInterface { worker.entryReader.interrupt(); } } + //If worker is already stopped but there was still entries batched, + //we need to clear buffer used for non processed entries + worker.clearWALEntryBatch(); } if (join) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index f0202ef95f9..5d4a71b260c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -21,6 +21,8 @@ import static org.apache.hadoop.hbase.replication.ReplicationUtils.getAdaptiveTi import java.io.IOException; import java.util.List; import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.atomic.LongAccumulator; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; @@ -54,7 +56,7 @@ public class ReplicationSourceShipper extends Thread { private final Configuration conf; protected final String walGroupId; protected final PriorityBlockingQueue queue; - private final ReplicationSourceInterface source; + private final ReplicationSource source; // Last position in the log that we sent to ZooKeeper // It will be accessed by the stats thread so make it volatile @@ -74,7 +76,7 @@ public class ReplicationSourceShipper extends Thread { private final int shipEditsTimeout; public ReplicationSourceShipper(Configuration conf, String walGroupId, - PriorityBlockingQueue queue, ReplicationSourceInterface source) { + PriorityBlockingQueue queue, ReplicationSource source) { this.conf = conf; this.walGroupId = walGroupId; this.queue = queue; @@ -125,6 +127,7 @@ public class ReplicationSourceShipper extends Thread { if (!isFinished()) { setWorkerState(WorkerState.STOPPED); } else { + source.workerThreads.remove(this.walGroupId); postFinish(); } } @@ -330,4 +333,56 @@ public class ReplicationSourceShipper extends Thread { } return sleepMultiplier < maxRetriesMultiplier; } + + /** + * Attempts to properly update ReplicationSourceManager.totalBufferUser, + * in case there were unprocessed entries batched by the reader to the shipper, + * but the shipper didn't manage to ship those because the replication source is being terminated. + * In that case, it iterates through the batched entries and decrease the pending + * entries size from ReplicationSourceManager.totalBufferUser + *

+ * NOTES + * 1) This method should only be called upon replication source termination. + * It blocks waiting for both shipper and reader threads termination, + * to make sure no race conditions + * when updating ReplicationSourceManager.totalBufferUser. + * + * 2) It does not attempt to terminate reader and shipper threads. Those must + * have been triggered interruption/termination prior to calling this method. + */ + void clearWALEntryBatch() { + long timeout = System.currentTimeMillis() + this.shipEditsTimeout; + while(this.isAlive() || this.entryReader.isAlive()){ + try { + if (System.currentTimeMillis() >= timeout) { + LOG.warn("Shipper clearWALEntryBatch method timed out whilst waiting reader/shipper " + + "thread to stop. Not cleaning buffer usage. Shipper alive: {}; Reader alive: {}", + this.source.getPeerId(), this.isAlive(), this.entryReader.isAlive()); + return; + } else { + // Wait both shipper and reader threads to stop + Thread.sleep(this.sleepForRetries); + } + } catch (InterruptedException e) { + LOG.warn("{} Interrupted while waiting {} to stop on clearWALEntryBatch. " + + "Not cleaning buffer usage: {}", this.source.getPeerId(), this.getName(), e); + return; + } + } + LongAccumulator totalToDecrement = new LongAccumulator((a,b) -> a + b, 0); + entryReader.entryBatchQueue.forEach(w -> { + entryReader.entryBatchQueue.remove(w); + w.getWalEntries().forEach(e -> { + long entrySizeExcludeBulkLoad = ReplicationSourceWALReader.getEntrySizeExcludeBulkLoad(e); + totalToDecrement.accumulate(entrySizeExcludeBulkLoad); + }); + }); + if( LOG.isTraceEnabled()) { + LOG.trace("Decrementing totalBufferUsed by {}B while stopping Replication WAL Readers.", + totalToDecrement.longValue()); + } + long newBufferUsed = source.getSourceManager().getTotalBufferUsed() + .addAndGet(-totalToDecrement.longValue()); + source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index dd5b4dc3008..1ff0a8b1b7b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -60,7 +60,8 @@ class ReplicationSourceWALReader extends Thread { private final WALEntryFilter filter; private final ReplicationSource source; - private final BlockingQueue entryBatchQueue; + @InterfaceAudience.Private + final BlockingQueue entryBatchQueue; // max (heap) size of each batch - multiply by number of batches in queue to get total private final long replicationBatchSizeCapacity; // max count of each batch - multiply by number of batches in queue to get total diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java index 607c8379fcd..2b4b1efdc7f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java @@ -25,7 +25,9 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; + import java.io.IOException; +import java.util.ArrayList; import java.util.OptionalLong; import java.util.UUID; import java.util.concurrent.ExecutorService; @@ -131,6 +133,8 @@ public class TestReplicationSource { when(mockPeer.getPeerConfig()).thenReturn(peerConfig); ReplicationSourceManager manager = mock(ReplicationSourceManager.class); when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); + Mockito.when(manager.getGlobalMetrics()). + thenReturn(mock(MetricsReplicationGlobalSourceSource.class)); String queueId = "qid"; RegionServerServices rss = TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); @@ -272,6 +276,47 @@ public class TestReplicationSource { } } + @Test + public void testTerminateClearsBuffer() throws Exception { + ReplicationSource source = new ReplicationSource(); + ReplicationSourceManager mockManager = mock(ReplicationSourceManager.class); + MetricsReplicationGlobalSourceSource mockMetrics = + mock(MetricsReplicationGlobalSourceSource.class); + AtomicLong buffer = new AtomicLong(); + Mockito.when(mockManager.getTotalBufferUsed()).thenReturn(buffer); + Mockito.when(mockManager.getGlobalMetrics()).thenReturn(mockMetrics); + ReplicationPeer mockPeer = mock(ReplicationPeer.class); + Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); + Configuration testConf = HBaseConfiguration.create(); + source.init(testConf, null, mockManager, null, mockPeer, null, + "testPeer", null, p -> OptionalLong.empty(), mock(MetricsSource.class)); + ReplicationSourceWALReader reader = new ReplicationSourceWALReader(null, + conf, null, 0, null, source); + ReplicationSourceShipper shipper = + new ReplicationSourceShipper(conf, null, null, source); + shipper.entryReader = reader; + source.workerThreads.put("testPeer", shipper); + WALEntryBatch batch = new WALEntryBatch(10, logDir); + WAL.Entry mockEntry = mock(WAL.Entry.class); + WALEdit mockEdit = mock(WALEdit.class); + WALKeyImpl mockKey = mock(WALKeyImpl.class); + when(mockEntry.getEdit()).thenReturn(mockEdit); + when(mockEdit.isEmpty()).thenReturn(false); + when(mockEntry.getKey()).thenReturn(mockKey); + when(mockKey.estimatedSerializedSizeOf()).thenReturn(1000L); + when(mockEdit.heapSize()).thenReturn(10000L); + when(mockEdit.size()).thenReturn(0); + ArrayList cells = new ArrayList<>(); + KeyValue kv = new KeyValue(Bytes.toBytes("0001"), Bytes.toBytes("f"), + Bytes.toBytes("1"), Bytes.toBytes("v1")); + cells.add(kv); + when(mockEdit.getCells()).thenReturn(cells); + reader.addEntryToBatch(batch, mockEntry); + reader.entryBatchQueue.put(batch); + source.terminate("test"); + assertEquals(0, source.getSourceManager().getTotalBufferUsed().get()); + } + /** * Tests that recovered queues are preserved on a regionserver shutdown. * See HBASE-18192 @@ -471,6 +516,8 @@ public class TestReplicationSource { when(mockPeer.getPeerConfig()).thenReturn(peerConfig); ReplicationSourceManager manager = mock(ReplicationSourceManager.class); when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); + Mockito.when(manager.getGlobalMetrics()). + thenReturn(mock(MetricsReplicationGlobalSourceSource.class)); String queueId = "qid"; RegionServerServices rss = TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));