Revert "HBASE-24813 ReplicationSource should clear buffer usage on Replicatio… (#2191)"

This reverts commit 687e53b7e4.
This commit is contained in:
Duo Zhang 2020-10-06 21:09:00 +08:00
parent 14b523ec98
commit e88509841f
4 changed files with 9 additions and 106 deletions

View File

@ -1,4 +1,4 @@
/** /*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -684,12 +684,14 @@ public class ReplicationSource implements ReplicationSourceInterface {
Threads.shutdown(initThread, this.sleepForRetries); Threads.shutdown(initThread, this.sleepForRetries);
} }
Collection<ReplicationSourceShipper> workers = workerThreads.values(); Collection<ReplicationSourceShipper> workers = workerThreads.values();
for (ReplicationSourceShipper worker : workers) { for (ReplicationSourceShipper worker : workers) {
worker.stopWorker(); worker.stopWorker();
if (worker.entryReader != null) { if(worker.entryReader != null) {
worker.entryReader.setReaderRunning(false); worker.entryReader.setReaderRunning(false);
} }
}
for (ReplicationSourceShipper worker : workers) {
if (worker.isAlive() || worker.entryReader.isAlive()) { if (worker.isAlive() || worker.entryReader.isAlive()) {
try { try {
// Wait worker to stop // Wait worker to stop
@ -707,9 +709,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
worker.entryReader.interrupt(); worker.entryReader.interrupt();
} }
} }
//If worker is already stopped but there was still entries batched,
//we need to clear buffer used for non processed entries
worker.clearWALEntryBatch();
} }
if (this.replicationEndpoint != null) { if (this.replicationEndpoint != null) {

View File

@ -23,8 +23,6 @@ import static org.apache.hadoop.hbase.replication.ReplicationUtils.sleepForRetri
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.LongAccumulator;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
@ -327,53 +325,4 @@ public class ReplicationSourceShipper extends Thread {
public boolean isFinished() { public boolean isFinished() {
return state == WorkerState.FINISHED; return state == WorkerState.FINISHED;
} }
/**
* Attempts to properly update <code>ReplicationSourceManager.totalBufferUser</code>,
* in case there were unprocessed entries batched by the reader to the shipper,
* but the shipper didn't manage to ship those because the replication source is being terminated.
* In that case, it iterates through the batched entries and decrease the pending
* entries size from <code>ReplicationSourceManager.totalBufferUser</code>
* <p/>
* <b>NOTES</b>
* 1) This method should only be called upon replication source termination.
* It blocks waiting for both shipper and reader threads termination,
* to make sure no race conditions
* when updating <code>ReplicationSourceManager.totalBufferUser</code>.
*
* 2) It <b>does not</b> attempt to terminate reader and shipper threads. Those <b>must</b>
* have been triggered interruption/termination prior to calling this method.
*/
void clearWALEntryBatch() {
long timeout = System.currentTimeMillis() + this.shipEditsTimeout;
while(this.isAlive() || this.entryReader.isAlive()){
try {
if (System.currentTimeMillis() >= timeout) {
LOG.warn("Interrupting source thread for peer {} without cleaning buffer usage "
+ "because clearWALEntryBatch method timed out whilst waiting reader/shipper "
+ "thread to stop.", this.source.getPeerId());
Thread.currentThread().interrupt();
} else {
// Wait both shipper and reader threads to stop
Thread.sleep(this.sleepForRetries);
}
} catch (InterruptedException e) {
LOG.warn("{} Interrupted while waiting {} to stop on clearWALEntryBatch: {}",
this.source.getPeerId(), this.getName(), e);
Thread.currentThread().interrupt();
}
}
LongAccumulator totalToDecrement = new LongAccumulator((a,b) -> a + b, 0);
entryReader.entryBatchQueue.forEach(w -> {
entryReader.entryBatchQueue.remove(w);
w.getWalEntries().forEach(e -> {
long entrySizeExcludeBulkLoad = entryReader.getEntrySizeExcludeBulkLoad(e);
totalToDecrement.accumulate(entrySizeExcludeBulkLoad);
});
});
LOG.trace("Decrementing totalBufferUsed by {}B while stopping Replication WAL Readers.",
totalToDecrement.longValue());
source.getSourceManager().getTotalBufferUsed().addAndGet(-totalToDecrement.longValue());
}
} }

View File

@ -60,8 +60,7 @@ class ReplicationSourceWALReader extends Thread {
private final WALEntryFilter filter; private final WALEntryFilter filter;
private final ReplicationSource source; private final ReplicationSource source;
@InterfaceAudience.Private private final BlockingQueue<WALEntryBatch> entryBatchQueue;
final BlockingQueue<WALEntryBatch> entryBatchQueue;
// max (heap) size of each batch - multiply by number of batches in queue to get total // max (heap) size of each batch - multiply by number of batches in queue to get total
private final long replicationBatchSizeCapacity; private final long replicationBatchSizeCapacity;
// max count of each batch - multiply by number of batches in queue to get total // max count of each batch - multiply by number of batches in queue to get total

View File

@ -22,10 +22,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.OptionalLong; import java.util.OptionalLong;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@ -272,47 +269,6 @@ public class TestReplicationSource {
} }
} }
@Test
public void testTerminateClearsBuffer() throws Exception {
ReplicationSource source = new ReplicationSource();
ReplicationSourceManager mockManager = mock(ReplicationSourceManager.class);
MetricsReplicationGlobalSourceSource mockMetrics =
mock(MetricsReplicationGlobalSourceSource.class);
AtomicLong buffer = new AtomicLong();
Mockito.when(mockManager.getTotalBufferUsed()).thenReturn(buffer);
Mockito.when(mockManager.getGlobalMetrics()).thenReturn(mockMetrics);
ReplicationPeer mockPeer = mock(ReplicationPeer.class);
Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
Configuration testConf = HBaseConfiguration.create();
source.init(testConf, null, mockManager, null, mockPeer, null,
"testPeer", null, p -> OptionalLong.empty(), mock(MetricsSource.class));
ReplicationSourceWALReader reader = new ReplicationSourceWALReader(null,
conf, null, 0, null, source);
ReplicationSourceShipper shipper =
new ReplicationSourceShipper(conf, null, null, source);
shipper.entryReader = reader;
source.workerThreads.put("testPeer", shipper);
WALEntryBatch batch = new WALEntryBatch(10, logDir);
WAL.Entry mockEntry = mock(WAL.Entry.class);
WALEdit mockEdit = mock(WALEdit.class);
WALKeyImpl mockKey = mock(WALKeyImpl.class);
when(mockEntry.getEdit()).thenReturn(mockEdit);
when(mockEdit.isEmpty()).thenReturn(false);
when(mockEntry.getKey()).thenReturn(mockKey);
when(mockKey.estimatedSerializedSizeOf()).thenReturn(1000L);
when(mockEdit.heapSize()).thenReturn(10000L);
when(mockEdit.size()).thenReturn(0);
ArrayList<Cell> cells = new ArrayList<>();
KeyValue kv = new KeyValue(Bytes.toBytes("0001"), Bytes.toBytes("f"),
Bytes.toBytes("1"), Bytes.toBytes("v1"));
cells.add(kv);
when(mockEdit.getCells()).thenReturn(cells);
reader.addEntryToBatch(batch, mockEntry);
reader.entryBatchQueue.put(batch);
source.terminate("test");
assertEquals(0, source.getSourceManager().getTotalBufferUsed().get());
}
/** /**
* Tests that recovered queues are preserved on a regionserver shutdown. * Tests that recovered queues are preserved on a regionserver shutdown.
* See HBASE-18192 * See HBASE-18192
@ -482,12 +438,12 @@ public class TestReplicationSource {
ServerName deadServer = ServerName.valueOf("www.deadServer.com", 12006, 1524679704419L); ServerName deadServer = ServerName.valueOf("www.deadServer.com", 12006, 1524679704419L);
PriorityBlockingQueue<Path> queue = new PriorityBlockingQueue<>(); PriorityBlockingQueue<Path> queue = new PriorityBlockingQueue<>();
queue.put(new Path("/www/html/test")); queue.put(new Path("/www/html/test"));
RecoveredReplicationSource source = mock(RecoveredReplicationSource.class); RecoveredReplicationSource source = Mockito.mock(RecoveredReplicationSource.class);
Server server = mock(Server.class); Server server = Mockito.mock(Server.class);
Mockito.when(server.getServerName()).thenReturn(serverName); Mockito.when(server.getServerName()).thenReturn(serverName);
Mockito.when(source.getServer()).thenReturn(server); Mockito.when(source.getServer()).thenReturn(server);
Mockito.when(source.getServerWALsBelongTo()).thenReturn(deadServer); Mockito.when(source.getServerWALsBelongTo()).thenReturn(deadServer);
ReplicationQueueStorage storage = mock(ReplicationQueueStorage.class); ReplicationQueueStorage storage = Mockito.mock(ReplicationQueueStorage.class);
Mockito.when(storage.getWALPosition(Mockito.eq(serverName), Mockito.any(), Mockito.any())) Mockito.when(storage.getWALPosition(Mockito.eq(serverName), Mockito.any(), Mockito.any()))
.thenReturn(1001L); .thenReturn(1001L);
Mockito.when(storage.getWALPosition(Mockito.eq(deadServer), Mockito.any(), Mockito.any())) Mockito.when(storage.getWALPosition(Mockito.eq(deadServer), Mockito.any(), Mockito.any()))