HBASE-24813 ReplicationSource should clear buffer usage on Replicatio… (#2546)
Signed-off-by: Ankit Singhal <ankit@apache.org>
This commit is contained in:
parent
481662ab39
commit
85842634e5
|
@ -686,6 +686,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
|
||||||
Threads.shutdown(initThread, this.sleepForRetries);
|
Threads.shutdown(initThread, this.sleepForRetries);
|
||||||
}
|
}
|
||||||
Collection<ReplicationSourceShipper> workers = workerThreads.values();
|
Collection<ReplicationSourceShipper> workers = workerThreads.values();
|
||||||
|
|
||||||
for (ReplicationSourceShipper worker : workers) {
|
for (ReplicationSourceShipper worker : workers) {
|
||||||
worker.stopWorker();
|
worker.stopWorker();
|
||||||
if(worker.entryReader != null) {
|
if(worker.entryReader != null) {
|
||||||
|
@ -696,6 +697,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
|
||||||
if (this.replicationEndpoint != null) {
|
if (this.replicationEndpoint != null) {
|
||||||
this.replicationEndpoint.stop();
|
this.replicationEndpoint.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
for (ReplicationSourceShipper worker : workers) {
|
for (ReplicationSourceShipper worker : workers) {
|
||||||
if (worker.isAlive() || worker.entryReader.isAlive()) {
|
if (worker.isAlive() || worker.entryReader.isAlive()) {
|
||||||
try {
|
try {
|
||||||
|
@ -714,6 +716,9 @@ public class ReplicationSource implements ReplicationSourceInterface {
|
||||||
worker.entryReader.interrupt();
|
worker.entryReader.interrupt();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
//If worker is already stopped but there was still entries batched,
|
||||||
|
//we need to clear buffer used for non processed entries
|
||||||
|
worker.clearWALEntryBatch();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (join) {
|
if (join) {
|
||||||
|
|
|
@ -22,6 +22,8 @@ import static org.apache.hadoop.hbase.replication.ReplicationUtils.sleepForRetri
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.PriorityBlockingQueue;
|
import java.util.concurrent.PriorityBlockingQueue;
|
||||||
|
import java.util.concurrent.atomic.LongAccumulator;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
|
@ -323,4 +325,56 @@ public class ReplicationSourceShipper extends Thread {
|
||||||
public boolean isFinished() {
|
public boolean isFinished() {
|
||||||
return state == WorkerState.FINISHED;
|
return state == WorkerState.FINISHED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Attempts to properly update <code>ReplicationSourceManager.totalBufferUser</code>,
|
||||||
|
* in case there were unprocessed entries batched by the reader to the shipper,
|
||||||
|
* but the shipper didn't manage to ship those because the replication source is being terminated.
|
||||||
|
* In that case, it iterates through the batched entries and decrease the pending
|
||||||
|
* entries size from <code>ReplicationSourceManager.totalBufferUser</code>
|
||||||
|
* <p/>
|
||||||
|
* <b>NOTES</b>
|
||||||
|
* 1) This method should only be called upon replication source termination.
|
||||||
|
* It blocks waiting for both shipper and reader threads termination,
|
||||||
|
* to make sure no race conditions
|
||||||
|
* when updating <code>ReplicationSourceManager.totalBufferUser</code>.
|
||||||
|
*
|
||||||
|
* 2) It <b>does not</b> attempt to terminate reader and shipper threads. Those <b>must</b>
|
||||||
|
* have been triggered interruption/termination prior to calling this method.
|
||||||
|
*/
|
||||||
|
void clearWALEntryBatch() {
|
||||||
|
long timeout = System.currentTimeMillis() + this.shipEditsTimeout;
|
||||||
|
while(this.isAlive() || this.entryReader.isAlive()){
|
||||||
|
try {
|
||||||
|
if (System.currentTimeMillis() >= timeout) {
|
||||||
|
LOG.warn("Shipper clearWALEntryBatch method timed out whilst waiting reader/shipper "
|
||||||
|
+ "thread to stop. Not cleaning buffer usage. Shipper alive: {}; Reader alive: {}",
|
||||||
|
this.source.getPeerId(), this.isAlive(), this.entryReader.isAlive());
|
||||||
|
return;
|
||||||
|
} else {
|
||||||
|
// Wait both shipper and reader threads to stop
|
||||||
|
Thread.sleep(this.sleepForRetries);
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
LOG.warn("{} Interrupted while waiting {} to stop on clearWALEntryBatch. "
|
||||||
|
+ "Not cleaning buffer usage: {}", this.source.getPeerId(), this.getName(), e);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
LongAccumulator totalToDecrement = new LongAccumulator((a,b) -> a + b, 0);
|
||||||
|
entryReader.entryBatchQueue.forEach(w -> {
|
||||||
|
entryReader.entryBatchQueue.remove(w);
|
||||||
|
w.getWalEntries().forEach(e -> {
|
||||||
|
long entrySizeExcludeBulkLoad = ReplicationSourceWALReader.getEntrySizeExcludeBulkLoad(e);
|
||||||
|
totalToDecrement.accumulate(entrySizeExcludeBulkLoad);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
if( LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace("Decrementing totalBufferUsed by {}B while stopping Replication WAL Readers.",
|
||||||
|
totalToDecrement.longValue());
|
||||||
|
}
|
||||||
|
long newBufferUsed = source.getSourceManager().getTotalBufferUsed()
|
||||||
|
.addAndGet(-totalToDecrement.longValue());
|
||||||
|
source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -60,7 +60,8 @@ class ReplicationSourceWALReader extends Thread {
|
||||||
private final WALEntryFilter filter;
|
private final WALEntryFilter filter;
|
||||||
private final ReplicationSource source;
|
private final ReplicationSource source;
|
||||||
|
|
||||||
private final BlockingQueue<WALEntryBatch> entryBatchQueue;
|
@InterfaceAudience.Private
|
||||||
|
final BlockingQueue<WALEntryBatch> entryBatchQueue;
|
||||||
// max (heap) size of each batch - multiply by number of batches in queue to get total
|
// max (heap) size of each batch - multiply by number of batches in queue to get total
|
||||||
private final long replicationBatchSizeCapacity;
|
private final long replicationBatchSizeCapacity;
|
||||||
// max count of each batch - multiply by number of batches in queue to get total
|
// max count of each batch - multiply by number of batches in queue to get total
|
||||||
|
|
|
@ -22,7 +22,10 @@ import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.OptionalLong;
|
import java.util.OptionalLong;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
|
@ -128,6 +131,8 @@ public class TestReplicationSource {
|
||||||
Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
|
Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
|
||||||
ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
|
ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
|
||||||
Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
|
Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
|
||||||
|
Mockito.when(manager.getGlobalMetrics()).
|
||||||
|
thenReturn(mock(MetricsReplicationGlobalSourceSource.class));
|
||||||
String queueId = "qid";
|
String queueId = "qid";
|
||||||
RegionServerServices rss =
|
RegionServerServices rss =
|
||||||
TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
|
TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
|
||||||
|
@ -269,6 +274,47 @@ public class TestReplicationSource {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTerminateClearsBuffer() throws Exception {
|
||||||
|
ReplicationSource source = new ReplicationSource();
|
||||||
|
ReplicationSourceManager mockManager = mock(ReplicationSourceManager.class);
|
||||||
|
MetricsReplicationGlobalSourceSource mockMetrics =
|
||||||
|
mock(MetricsReplicationGlobalSourceSource.class);
|
||||||
|
AtomicLong buffer = new AtomicLong();
|
||||||
|
Mockito.when(mockManager.getTotalBufferUsed()).thenReturn(buffer);
|
||||||
|
Mockito.when(mockManager.getGlobalMetrics()).thenReturn(mockMetrics);
|
||||||
|
ReplicationPeer mockPeer = mock(ReplicationPeer.class);
|
||||||
|
Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
|
||||||
|
Configuration testConf = HBaseConfiguration.create();
|
||||||
|
source.init(testConf, null, mockManager, null, mockPeer, null,
|
||||||
|
"testPeer", null, p -> OptionalLong.empty(), mock(MetricsSource.class));
|
||||||
|
ReplicationSourceWALReader reader = new ReplicationSourceWALReader(null,
|
||||||
|
conf, null, 0, null, source);
|
||||||
|
ReplicationSourceShipper shipper =
|
||||||
|
new ReplicationSourceShipper(conf, null, null, source);
|
||||||
|
shipper.entryReader = reader;
|
||||||
|
source.workerThreads.put("testPeer", shipper);
|
||||||
|
WALEntryBatch batch = new WALEntryBatch(10, logDir);
|
||||||
|
WAL.Entry mockEntry = mock(WAL.Entry.class);
|
||||||
|
WALEdit mockEdit = mock(WALEdit.class);
|
||||||
|
WALKeyImpl mockKey = mock(WALKeyImpl.class);
|
||||||
|
when(mockEntry.getEdit()).thenReturn(mockEdit);
|
||||||
|
when(mockEdit.isEmpty()).thenReturn(false);
|
||||||
|
when(mockEntry.getKey()).thenReturn(mockKey);
|
||||||
|
when(mockKey.estimatedSerializedSizeOf()).thenReturn(1000L);
|
||||||
|
when(mockEdit.heapSize()).thenReturn(10000L);
|
||||||
|
when(mockEdit.size()).thenReturn(0);
|
||||||
|
ArrayList<Cell> cells = new ArrayList<>();
|
||||||
|
KeyValue kv = new KeyValue(Bytes.toBytes("0001"), Bytes.toBytes("f"),
|
||||||
|
Bytes.toBytes("1"), Bytes.toBytes("v1"));
|
||||||
|
cells.add(kv);
|
||||||
|
when(mockEdit.getCells()).thenReturn(cells);
|
||||||
|
reader.addEntryToBatch(batch, mockEntry);
|
||||||
|
reader.entryBatchQueue.put(batch);
|
||||||
|
source.terminate("test");
|
||||||
|
assertEquals(0, source.getSourceManager().getTotalBufferUsed().get());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests that recovered queues are preserved on a regionserver shutdown.
|
* Tests that recovered queues are preserved on a regionserver shutdown.
|
||||||
* See HBASE-18192
|
* See HBASE-18192
|
||||||
|
@ -438,12 +484,12 @@ public class TestReplicationSource {
|
||||||
ServerName deadServer = ServerName.valueOf("www.deadServer.com", 12006, 1524679704419L);
|
ServerName deadServer = ServerName.valueOf("www.deadServer.com", 12006, 1524679704419L);
|
||||||
PriorityBlockingQueue<Path> queue = new PriorityBlockingQueue<>();
|
PriorityBlockingQueue<Path> queue = new PriorityBlockingQueue<>();
|
||||||
queue.put(new Path("/www/html/test"));
|
queue.put(new Path("/www/html/test"));
|
||||||
RecoveredReplicationSource source = Mockito.mock(RecoveredReplicationSource.class);
|
RecoveredReplicationSource source = mock(RecoveredReplicationSource.class);
|
||||||
Server server = Mockito.mock(Server.class);
|
Server server = mock(Server.class);
|
||||||
Mockito.when(server.getServerName()).thenReturn(serverName);
|
Mockito.when(server.getServerName()).thenReturn(serverName);
|
||||||
Mockito.when(source.getServer()).thenReturn(server);
|
Mockito.when(source.getServer()).thenReturn(server);
|
||||||
Mockito.when(source.getServerWALsBelongTo()).thenReturn(deadServer);
|
Mockito.when(source.getServerWALsBelongTo()).thenReturn(deadServer);
|
||||||
ReplicationQueueStorage storage = Mockito.mock(ReplicationQueueStorage.class);
|
ReplicationQueueStorage storage = mock(ReplicationQueueStorage.class);
|
||||||
Mockito.when(storage.getWALPosition(Mockito.eq(serverName), Mockito.any(), Mockito.any()))
|
Mockito.when(storage.getWALPosition(Mockito.eq(serverName), Mockito.any(), Mockito.any()))
|
||||||
.thenReturn(1001L);
|
.thenReturn(1001L);
|
||||||
Mockito.when(storage.getWALPosition(Mockito.eq(deadServer), Mockito.any(), Mockito.any()))
|
Mockito.when(storage.getWALPosition(Mockito.eq(deadServer), Mockito.any(), Mockito.any()))
|
||||||
|
@ -468,6 +514,8 @@ public class TestReplicationSource {
|
||||||
Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
|
Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
|
||||||
ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
|
ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
|
||||||
Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
|
Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
|
||||||
|
Mockito.when(manager.getGlobalMetrics()).
|
||||||
|
thenReturn(mock(MetricsReplicationGlobalSourceSource.class));
|
||||||
String queueId = "qid";
|
String queueId = "qid";
|
||||||
RegionServerServices rss =
|
RegionServerServices rss =
|
||||||
TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
|
TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
|
||||||
|
|
Loading…
Reference in New Issue