From e88509841f1d4d8080ff539d5308b6bed0ac8595 Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Tue, 6 Oct 2020 21:09:00 +0800 Subject: [PATCH] =?UTF-8?q?Revert=20"HBASE-24813=20ReplicationSource=20sho?= =?UTF-8?q?uld=20clear=20buffer=20usage=20on=20Replicatio=E2=80=A6=20(#219?= =?UTF-8?q?1)"?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit 687e53b7e49c1a149e55829bbeca6aa4edfe69e7. --- .../regionserver/ReplicationSource.java | 11 ++-- .../ReplicationSourceShipper.java | 51 ------------------- .../ReplicationSourceWALReader.java | 3 +- .../regionserver/TestReplicationSource.java | 50 ++---------------- 4 files changed, 9 insertions(+), 106 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 4b034f56a93..b68e0587d7d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -684,12 +684,14 @@ public class ReplicationSource implements ReplicationSourceInterface { Threads.shutdown(initThread, this.sleepForRetries); } Collection workers = workerThreads.values(); - for (ReplicationSourceShipper worker : workers) { worker.stopWorker(); - if (worker.entryReader != null) { + if(worker.entryReader != null) { worker.entryReader.setReaderRunning(false); } + } + + for (ReplicationSourceShipper worker : workers) { if (worker.isAlive() || worker.entryReader.isAlive()) { try { // Wait worker to stop @@ -707,9 +709,6 @@ public class ReplicationSource implements ReplicationSourceInterface { worker.entryReader.interrupt(); } } - //If worker is already stopped but there was still entries batched, - //we need to clear buffer used for non processed entries - worker.clearWALEntryBatch(); } if (this.replicationEndpoint != null) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index bb552757734..45eb91c2e72 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -23,8 +23,6 @@ import static org.apache.hadoop.hbase.replication.ReplicationUtils.sleepForRetri import java.io.IOException; import java.util.List; import java.util.concurrent.PriorityBlockingQueue; -import java.util.concurrent.atomic.LongAccumulator; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; @@ -327,53 +325,4 @@ public class ReplicationSourceShipper extends Thread { public boolean isFinished() { return state == WorkerState.FINISHED; } - - /** - * Attempts to properly update ReplicationSourceManager.totalBufferUser, - * in case there were unprocessed entries batched by the reader to the shipper, - * but the shipper didn't manage to ship those because the replication source is being terminated. - * In that case, it iterates through the batched entries and decrease the pending - * entries size from ReplicationSourceManager.totalBufferUser - *

- * NOTES - * 1) This method should only be called upon replication source termination. - * It blocks waiting for both shipper and reader threads termination, - * to make sure no race conditions - * when updating ReplicationSourceManager.totalBufferUser. - * - * 2) It does not attempt to terminate reader and shipper threads. Those must - * have been triggered interruption/termination prior to calling this method. - */ - void clearWALEntryBatch() { - long timeout = System.currentTimeMillis() + this.shipEditsTimeout; - while(this.isAlive() || this.entryReader.isAlive()){ - try { - if (System.currentTimeMillis() >= timeout) { - LOG.warn("Interrupting source thread for peer {} without cleaning buffer usage " - + "because clearWALEntryBatch method timed out whilst waiting reader/shipper " - + "thread to stop.", this.source.getPeerId()); - Thread.currentThread().interrupt(); - } else { - // Wait both shipper and reader threads to stop - Thread.sleep(this.sleepForRetries); - } - } catch (InterruptedException e) { - LOG.warn("{} Interrupted while waiting {} to stop on clearWALEntryBatch: {}", - this.source.getPeerId(), this.getName(), e); - Thread.currentThread().interrupt(); - } - } - LongAccumulator totalToDecrement = new LongAccumulator((a,b) -> a + b, 0); - entryReader.entryBatchQueue.forEach(w -> { - entryReader.entryBatchQueue.remove(w); - w.getWalEntries().forEach(e -> { - long entrySizeExcludeBulkLoad = entryReader.getEntrySizeExcludeBulkLoad(e); - totalToDecrement.accumulate(entrySizeExcludeBulkLoad); - }); - }); - - LOG.trace("Decrementing totalBufferUsed by {}B while stopping Replication WAL Readers.", - totalToDecrement.longValue()); - source.getSourceManager().getTotalBufferUsed().addAndGet(-totalToDecrement.longValue()); - } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index a6d87870b49..c71db1bf785 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -60,8 +60,7 @@ class ReplicationSourceWALReader extends Thread { private final WALEntryFilter filter; private final ReplicationSource source; - @InterfaceAudience.Private - final BlockingQueue entryBatchQueue; + private final BlockingQueue entryBatchQueue; // max (heap) size of each batch - multiply by number of batches in queue to get total private final long replicationBatchSizeCapacity; // max count of each batch - multiply by number of batches in queue to get total diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java index ded9e8f28e2..14558926a46 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java @@ -22,10 +22,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; import java.io.IOException; -import java.util.ArrayList; import java.util.OptionalLong; import java.util.UUID; import java.util.concurrent.ExecutorService; @@ -272,47 +269,6 @@ public class TestReplicationSource { } } - @Test - public void testTerminateClearsBuffer() throws Exception { - ReplicationSource source = new ReplicationSource(); - ReplicationSourceManager mockManager = mock(ReplicationSourceManager.class); - MetricsReplicationGlobalSourceSource mockMetrics = - mock(MetricsReplicationGlobalSourceSource.class); - AtomicLong buffer = new AtomicLong(); - Mockito.when(mockManager.getTotalBufferUsed()).thenReturn(buffer); - Mockito.when(mockManager.getGlobalMetrics()).thenReturn(mockMetrics); - ReplicationPeer mockPeer = mock(ReplicationPeer.class); - Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); - Configuration testConf = HBaseConfiguration.create(); - source.init(testConf, null, mockManager, null, mockPeer, null, - "testPeer", null, p -> OptionalLong.empty(), mock(MetricsSource.class)); - ReplicationSourceWALReader reader = new ReplicationSourceWALReader(null, - conf, null, 0, null, source); - ReplicationSourceShipper shipper = - new ReplicationSourceShipper(conf, null, null, source); - shipper.entryReader = reader; - source.workerThreads.put("testPeer", shipper); - WALEntryBatch batch = new WALEntryBatch(10, logDir); - WAL.Entry mockEntry = mock(WAL.Entry.class); - WALEdit mockEdit = mock(WALEdit.class); - WALKeyImpl mockKey = mock(WALKeyImpl.class); - when(mockEntry.getEdit()).thenReturn(mockEdit); - when(mockEdit.isEmpty()).thenReturn(false); - when(mockEntry.getKey()).thenReturn(mockKey); - when(mockKey.estimatedSerializedSizeOf()).thenReturn(1000L); - when(mockEdit.heapSize()).thenReturn(10000L); - when(mockEdit.size()).thenReturn(0); - ArrayList cells = new ArrayList<>(); - KeyValue kv = new KeyValue(Bytes.toBytes("0001"), Bytes.toBytes("f"), - Bytes.toBytes("1"), Bytes.toBytes("v1")); - cells.add(kv); - when(mockEdit.getCells()).thenReturn(cells); - reader.addEntryToBatch(batch, mockEntry); - reader.entryBatchQueue.put(batch); - source.terminate("test"); - assertEquals(0, source.getSourceManager().getTotalBufferUsed().get()); - } - /** * Tests that recovered queues are preserved on a regionserver shutdown. * See HBASE-18192 @@ -482,12 +438,12 @@ public class TestReplicationSource { ServerName deadServer = ServerName.valueOf("www.deadServer.com", 12006, 1524679704419L); PriorityBlockingQueue queue = new PriorityBlockingQueue<>(); queue.put(new Path("/www/html/test")); - RecoveredReplicationSource source = mock(RecoveredReplicationSource.class); - Server server = mock(Server.class); + RecoveredReplicationSource source = Mockito.mock(RecoveredReplicationSource.class); + Server server = Mockito.mock(Server.class); Mockito.when(server.getServerName()).thenReturn(serverName); Mockito.when(source.getServer()).thenReturn(server); Mockito.when(source.getServerWALsBelongTo()).thenReturn(deadServer); - ReplicationQueueStorage storage = mock(ReplicationQueueStorage.class); + ReplicationQueueStorage storage = Mockito.mock(ReplicationQueueStorage.class); Mockito.when(storage.getWALPosition(Mockito.eq(serverName), Mockito.any(), Mockito.any())) .thenReturn(1001L); Mockito.when(storage.getWALPosition(Mockito.eq(deadServer), Mockito.any(), Mockito.any()))