HBASE-27785 Encapsulate and centralize totalBufferUsed in ReplicationSourceManager (#5196)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
chenglei 2023-04-21 21:52:40 +08:00 committed by GitHub
parent 92a2868f37
commit 0787199feb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 136 additions and 128 deletions

View File

@ -139,8 +139,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
protected final ConcurrentHashMap<String, ReplicationSourceShipper> workerThreads =
new ConcurrentHashMap<>();
private AtomicLong totalBufferUsed;
public static final String WAIT_ON_ENDPOINT_SECONDS =
"hbase.replication.wait.on.endpoint.seconds";
public static final int DEFAULT_WAIT_ON_ENDPOINT_SECONDS = 30;
@ -224,7 +222,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
currentBandwidth = getCurrentBandwidth();
this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
this.totalBufferUsed = manager.getTotalBufferUsed();
this.walFileLengthProvider = walFileLengthProvider;
this.abortOnError = this.conf.getBoolean("replication.source.regionserver.abort", true);
@ -797,14 +794,12 @@ public class ReplicationSource implements ReplicationSourceInterface {
@Override
// offsets totalBufferUsed by deducting shipped batchSize.
public void postShipEdits(List<Entry> entries, int batchSize) {
public void postShipEdits(List<Entry> entries, long batchSize) {
if (throttler.isEnabled()) {
throttler.addPushSize(batchSize);
}
totalReplicatedEdits.addAndGet(entries.size());
long newBufferUsed = totalBufferUsed.addAndGet(-batchSize);
// Record the new buffer usage
this.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
this.manager.releaseBufferQuota(batchSize);
}
@Override

View File

@ -149,7 +149,7 @@ public interface ReplicationSourceInterface {
* @param entries pushed
* @param batchSize entries size pushed
*/
void postShipEdits(List<Entry> entries, int batchSize);
void postShipEdits(List<Entry> entries, long batchSize);
/**
* The queue of WALs only belong to one region server. This will return the server name which all

View File

@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.yetus.audience.InterfaceAudience;
@ -783,8 +784,8 @@ public class ReplicationSourceManager {
}
}
public AtomicLong getTotalBufferUsed() {
return totalBufferUsed;
public long getTotalBufferUsed() {
return totalBufferUsed.get();
}
/**
@ -834,7 +835,7 @@ public class ReplicationSourceManager {
StringBuilder stats = new StringBuilder();
// Print stats that apply across all Replication Sources
stats.append("Global stats: ");
stats.append("WAL Edits Buffer Used=").append(getTotalBufferUsed().get()).append("B, Limit=")
stats.append("WAL Edits Buffer Used=").append(getTotalBufferUsed()).append("B, Limit=")
.append(getTotalBufferLimit()).append("B\n");
for (ReplicationSourceInterface source : this.sources.values()) {
stats.append("Normal source for cluster " + source.getPeerId() + ": ");
@ -942,4 +943,80 @@ public class ReplicationSourceManager {
ReplicationQueueStorage getQueueStorage() {
return queueStorage;
}
/**
* Acquire the buffer quota for {@link Entry} which is added to {@link WALEntryBatch}.
* @param entry the wal entry which is added to {@link WALEntryBatch} and should acquire buffer
* quota.
* @return true if we should clear buffer and push all
*/
boolean acquireWALEntryBufferQuota(WALEntryBatch walEntryBatch, Entry entry) {
long entrySize = walEntryBatch.incrementUsedBufferSize(entry);
return this.acquireBufferQuota(entrySize);
}
/**
* To release the buffer quota of {@link WALEntryBatch} which acquired by
* {@link ReplicationSourceManager#acquireWALEntryBufferQuota}.
* @return the released buffer quota size.
*/
long releaseWALEntryBatchBufferQuota(WALEntryBatch walEntryBatch) {
long usedBufferSize = walEntryBatch.getUsedBufferSize();
if (usedBufferSize > 0) {
this.releaseBufferQuota(usedBufferSize);
}
return usedBufferSize;
}
/**
* Add the size to {@link ReplicationSourceManager#totalBufferUsed} and check if it exceeds
* {@link ReplicationSourceManager#totalBufferLimit}.
* @return true if {@link ReplicationSourceManager#totalBufferUsed} exceeds
* {@link ReplicationSourceManager#totalBufferLimit},we should stop increase buffer and
* ship all.
*/
boolean acquireBufferQuota(long size) {
if (size < 0) {
throw new IllegalArgumentException("size should not less than 0");
}
long newBufferUsed = addTotalBufferUsed(size);
return newBufferUsed >= totalBufferLimit;
}
/**
* To release the buffer quota which acquired by
* {@link ReplicationSourceManager#acquireBufferQuota}.
*/
void releaseBufferQuota(long size) {
if (size < 0) {
throw new IllegalArgumentException("size should not less than 0");
}
addTotalBufferUsed(-size);
}
private long addTotalBufferUsed(long size) {
if (size == 0) {
return totalBufferUsed.get();
}
long newBufferUsed = totalBufferUsed.addAndGet(size);
// Record the new buffer usage
this.globalMetrics.setWALReaderEditsBufferBytes(newBufferUsed);
return newBufferUsed;
}
/**
* Check if {@link ReplicationSourceManager#totalBufferUsed} exceeds
* {@link ReplicationSourceManager#totalBufferLimit} for peer.
* @return true if {@link ReplicationSourceManager#totalBufferUsed} not more than
* {@link ReplicationSourceManager#totalBufferLimit}.
*/
boolean checkBufferQuota(String peerId) {
// try not to go over total quota
if (totalBufferUsed.get() > totalBufferLimit) {
LOG.warn("peer={}, can't read more edits from WAL as buffer usage {}B exceeds limit {}B",
peerId, totalBufferUsed.get(), totalBufferLimit);
return false;
}
return true;
}
}

View File

@ -21,7 +21,6 @@ import static org.apache.hadoop.hbase.replication.ReplicationUtils.getAdaptiveTi
import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.LongAccumulator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
@ -142,18 +141,6 @@ public class ReplicationSourceShipper extends Thread {
protected void postFinish() {
}
/**
* get batchEntry size excludes bulk load file sizes. Uses ReplicationSourceWALReader's static
* method.
*/
private int getBatchEntrySizeExcludeBulkLoad(WALEntryBatch entryBatch) {
int totalSize = 0;
for (Entry entry : entryBatch.getWalEntries()) {
totalSize += ReplicationSourceWALReader.getEntrySizeExcludeBulkLoad(entry);
}
return totalSize;
}
/**
* Do the shipping logic
*/
@ -165,7 +152,6 @@ public class ReplicationSourceShipper extends Thread {
return;
}
int currentSize = (int) entryBatch.getHeapSize();
int sizeExcludeBulkLoad = getBatchEntrySizeExcludeBulkLoad(entryBatch);
source.getSourceMetrics()
.setTimeStampNextToReplicate(entries.get(entries.size() - 1).getKey().getWriteTime());
while (isActive()) {
@ -209,7 +195,7 @@ public class ReplicationSourceShipper extends Thread {
// this sizeExcludeBulkLoad has to use same calculation that when calling
// acquireBufferQuota() in ReplicationSourceWALReader because they maintain
// same variable: totalBufferUsed
source.postShipEdits(entries, sizeExcludeBulkLoad);
source.postShipEdits(entries, entryBatch.getUsedBufferSize());
// FIXME check relationship between wal group and overall
source.getSourceMetrics().shipBatch(entryBatch.getNbOperations(), currentSize,
entryBatch.getNbHFiles());
@ -372,20 +358,17 @@ public class ReplicationSourceShipper extends Thread {
return;
}
}
LongAccumulator totalToDecrement = new LongAccumulator((a, b) -> a + b, 0);
entryReader.entryBatchQueue.forEach(w -> {
entryReader.entryBatchQueue.remove(w);
w.getWalEntries().forEach(e -> {
long entrySizeExcludeBulkLoad = ReplicationSourceWALReader.getEntrySizeExcludeBulkLoad(e);
totalToDecrement.accumulate(entrySizeExcludeBulkLoad);
});
});
long totalReleasedBytes = 0;
while (true) {
WALEntryBatch batch = entryReader.entryBatchQueue.poll();
if (batch == null) {
break;
}
totalReleasedBytes += source.getSourceManager().releaseWALEntryBatchBufferQuota(batch);
}
if (LOG.isTraceEnabled()) {
LOG.trace("Decrementing totalBufferUsed by {}B while stopping Replication WAL Readers.",
totalToDecrement.longValue());
totalReleasedBytes);
}
long newBufferUsed =
source.getSourceManager().getTotalBufferUsed().addAndGet(-totalToDecrement.longValue());
source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
}
}

View File

@ -23,7 +23,6 @@ import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -35,7 +34,6 @@ import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
@ -75,9 +73,6 @@ class ReplicationSourceWALReader extends Thread {
// Indicates whether this particular worker is running
private boolean isReaderRunning = true;
private AtomicLong totalBufferUsed;
private long totalBufferQuota;
private final String walGroupId;
/**
@ -105,8 +100,6 @@ class ReplicationSourceWALReader extends Thread {
// memory used will be batchSizeCapacity * (nb.batches + 1)
// the +1 is for the current thread reading before placing onto the queue
int batchCount = conf.getInt("replication.source.nb.batches", 1);
this.totalBufferUsed = source.getSourceManager().getTotalBufferUsed();
this.totalBufferQuota = source.getSourceManager().getTotalBufferLimit();
// 1 second
this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000);
// 5 minutes @ 1 sec per
@ -147,7 +140,7 @@ class ReplicationSourceWALReader extends Thread {
Threads.sleep(sleepForRetries);
continue;
}
if (!checkQuota()) {
if (!checkBufferQuota()) {
continue;
}
Path currentPath = entryStream.getCurrentPath();
@ -188,7 +181,7 @@ class ReplicationSourceWALReader extends Thread {
// batch is not put to ReplicationSourceWALReader#entryBatchQueue,so we should
// decrease ReplicationSourceWALReader.totalBufferUsed by the byte size which
// acquired in ReplicationSourceWALReader.acquireBufferQuota.
this.releaseBufferQuota(batch);
this.getSourceManager().releaseWALEntryBatchBufferQuota(batch);
}
}
}
@ -218,10 +211,9 @@ class ReplicationSourceWALReader extends Thread {
entry.getKey().getWriteTime(), entry.getKey(), this.source.getQueueId());
updateReplicationMarkerEdit(entry, batch.getLastWalPosition());
long entrySize = getEntrySizeIncludeBulkLoad(entry);
long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry);
batch.addEntry(entry, entrySize);
updateBatchStats(batch, entry, entrySize);
boolean totalBufferTooLarge = acquireBufferQuota(batch, entrySizeExcludeBulkLoad);
boolean totalBufferTooLarge = this.getSourceManager().acquireWALEntryBufferQuota(batch, entry);
// Stop if too many entries or too big
return totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity
@ -275,11 +267,9 @@ class ReplicationSourceWALReader extends Thread {
}
// returns false if we've already exceeded the global quota
private boolean checkQuota() {
private boolean checkBufferQuota() {
// try not to go over total quota
if (totalBufferUsed.get() > totalBufferQuota) {
LOG.warn("peer={}, can't read more edits from WAL as buffer usage {}B exceeds limit {}B",
this.source.getPeerId(), totalBufferUsed.get(), totalBufferQuota);
if (!this.getSourceManager().checkBufferQuota(this.source.getPeerId())) {
Threads.sleep(sleepForRetries);
return false;
}
@ -319,13 +309,7 @@ class ReplicationSourceWALReader extends Thread {
private long getEntrySizeIncludeBulkLoad(Entry entry) {
WALEdit edit = entry.getEdit();
return getEntrySizeExcludeBulkLoad(entry) + sizeOfStoreFilesIncludeBulkLoad(edit);
}
public static long getEntrySizeExcludeBulkLoad(Entry entry) {
WALEdit edit = entry.getEdit();
WALKey key = entry.getKey();
return edit.heapSize() + key.estimatedSerializedSizeOf();
return WALEntryBatch.getEntrySizeExcludeBulkLoad(entry) + sizeOfStoreFilesIncludeBulkLoad(edit);
}
private void updateBatchStats(WALEntryBatch batch, Entry entry, long entrySize) {
@ -435,30 +419,6 @@ class ReplicationSourceWALReader extends Thread {
edit.setCells(newCells);
}
/**
* @param size delta size for grown buffer
* @return true if we should clear buffer and push all
*/
private boolean acquireBufferQuota(WALEntryBatch walEntryBatch, long size) {
long newBufferUsed = totalBufferUsed.addAndGet(size);
// Record the new buffer usage
this.source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
walEntryBatch.incrementUsedBufferSize(size);
return newBufferUsed >= totalBufferQuota;
}
/**
* To release the buffer quota of {@link WALEntryBatch} which acquired by
* {@link ReplicationSourceWALReader#acquireBufferQuota}
*/
private void releaseBufferQuota(WALEntryBatch walEntryBatch) {
long usedBufferSize = walEntryBatch.getUsedBufferSize();
if (usedBufferSize > 0) {
long newBufferUsed = totalBufferUsed.addAndGet(-usedBufferSize);
this.source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
}
}
/** Returns whether the reader thread is running */
public boolean isReaderRunning() {
return isReaderRunning && !isInterrupted();
@ -470,4 +430,8 @@ class ReplicationSourceWALReader extends Thread {
public void setReaderRunning(boolean readerRunning) {
this.isReaderRunning = readerRunning;
}
private ReplicationSourceManager getSourceManager() {
return this.source.getSourceManager();
}
}

View File

@ -100,7 +100,7 @@ public class ReplicationThrottler {
* Add current size to the current cycle's total push size
* @param size is the current size added to the current cycle's total push size
*/
public void addPushSize(final int size) {
public void addPushSize(final long size) {
if (this.enabled) {
this.cyclePushSize += size;
}

View File

@ -25,6 +25,8 @@ import java.util.stream.Collectors;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.yetus.audience.InterfaceAudience;
/**
@ -156,8 +158,10 @@ class WALEntryBatch {
lastSeqIds.put(region, sequenceId);
}
public void incrementUsedBufferSize(long increment) {
public long incrementUsedBufferSize(Entry entry) {
long increment = getEntrySizeExcludeBulkLoad(entry);
usedBufferSize += increment;
return increment;
}
public long getUsedBufferSize() {
@ -171,4 +175,10 @@ class WALEntryBatch {
+ nbHFiles + ", heapSize=" + heapSize + ", lastSeqIds=" + lastSeqIds + ", endOfFile="
+ endOfFile + ",usedBufferSize=" + usedBufferSize + "]";
}
static long getEntrySizeExcludeBulkLoad(Entry entry) {
WALEdit edit = entry.getEdit();
WALKey key = entry.getKey();
return edit.heapSize() + key.estimatedSerializedSizeOf();
}
}

View File

@ -149,7 +149,7 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
}
@Override
public void postShipEdits(List<Entry> entries, int batchSize) {
public void postShipEdits(List<Entry> entries, long batchSize) {
}
@Override

View File

@ -294,11 +294,10 @@ public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase {
Assert.assertEquals(key.getReplicationScopes(), deserializedKey.getReplicationScopes());
}
private ReplicationSource mockReplicationSource(boolean recovered, Configuration conf) {
ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
when(mockSourceManager.getTotalBufferLimit())
.thenReturn((long) HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
private ReplicationSource mockReplicationSource(boolean recovered, Configuration conf)
throws IOException {
ReplicationSourceManager mockSourceManager = new ReplicationSourceManager(null, null, conf,
null, null, null, null, null, null, createMockGlobalMetrics());
Server mockServer = Mockito.mock(Server.class);
ReplicationSource source = Mockito.mock(ReplicationSource.class);
when(source.getSourceManager()).thenReturn(mockSourceManager);
@ -306,6 +305,10 @@ public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase {
when(source.getWALFileLengthProvider()).thenReturn(log);
when(source.getServer()).thenReturn(mockServer);
when(source.isRecovered()).thenReturn(recovered);
return source;
}
private MetricsReplicationGlobalSourceSource createMockGlobalMetrics() {
MetricsReplicationGlobalSourceSource globalMetrics =
Mockito.mock(MetricsReplicationGlobalSourceSource.class);
final AtomicLong bufferUsedCounter = new AtomicLong(0);
@ -315,12 +318,11 @@ public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase {
}).when(globalMetrics).setWALReaderEditsBufferBytes(Mockito.anyLong());
when(globalMetrics.getWALReaderEditsBufferBytes())
.then(invocationOnMock -> bufferUsedCounter.get());
when(mockSourceManager.getGlobalMetrics()).thenReturn(globalMetrics);
return source;
return globalMetrics;
}
private ReplicationSourceWALReader createReader(boolean recovered, Configuration conf) {
private ReplicationSourceWALReader createReader(boolean recovered, Configuration conf)
throws IOException {
ReplicationSource source = mockReplicationSource(recovered, conf);
when(source.isPeerEnabled()).thenReturn(true);
ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, conf, logQueue, 0,
@ -330,7 +332,7 @@ public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase {
}
private ReplicationSourceWALReader createReaderWithBadReplicationFilter(int numFailures,
Configuration conf) {
Configuration conf) throws IOException {
ReplicationSource source = mockReplicationSource(false, conf);
when(source.isPeerEnabled()).thenReturn(true);
ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, conf, logQueue, 0,
@ -667,12 +669,7 @@ public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase {
appendEntries(writer1, 3);
localLogQueue.enqueueLog(log1, fakeWalGroupId);
ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class);
// Make it look like the source is from recovered source.
when(mockSourceManager.getOldSources())
.thenReturn(new ArrayList<>(Arrays.asList((ReplicationSourceInterface) source)));
when(source.isPeerEnabled()).thenReturn(true);
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
// Override the max retries multiplier to fail fast.
conf.setInt("replication.source.maxretriesmultiplier", 1);
conf.setBoolean("replication.source.eof.autorecovery", true);
@ -784,10 +781,8 @@ public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase {
// make sure the size of the wal file is 0.
assertEquals(0, fs.getFileStatus(archivePath).getLen());
ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
ReplicationSource source = Mockito.mock(ReplicationSource.class);
when(source.isPeerEnabled()).thenReturn(true);
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
Configuration localConf = new Configuration(CONF);
localConf.setInt("replication.source.maxretriesmultiplier", 1);
@ -834,11 +829,11 @@ public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase {
assertNotNull(entryBatch);
assertEquals(3, entryBatch.getWalEntries().size());
long sum = entryBatch.getWalEntries().stream()
.mapToLong(ReplicationSourceWALReader::getEntrySizeExcludeBulkLoad).sum();
.mapToLong(WALEntryBatch::getEntrySizeExcludeBulkLoad).sum();
assertEquals(position, entryBatch.getLastWalPosition());
assertEquals(walPath, entryBatch.getLastWalPath());
assertEquals(3, entryBatch.getNbRowKeys());
assertEquals(sum, source.getSourceManager().getTotalBufferUsed().get());
assertEquals(sum, source.getSourceManager().getTotalBufferUsed());
assertEquals(sum, source.getSourceManager().getGlobalMetrics().getWALReaderEditsBufferBytes());
assertEquals(maxThrowExceptionCount, walEntryFilter.getThrowExceptionCount());
assertNull(reader.poll(10));

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
@ -133,10 +132,9 @@ public class TestGlobalReplicationThrottler {
Thread watcher = new Thread(() -> {
Replication replication = (Replication) utility1.getMiniHBaseCluster().getRegionServer(0)
.getReplicationSourceService();
AtomicLong bufferUsed = replication.getReplicationManager().getTotalBufferUsed();
testQuotaPass = true;
while (!Thread.interrupted()) {
long size = bufferUsed.get();
long size = replication.getReplicationManager().getTotalBufferUsed();
if (size > 0) {
testQuotaNonZero = true;
}

View File

@ -34,7 +34,6 @@ import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -133,7 +132,6 @@ public class TestReplicationSource {
.thenReturn(DoNothingReplicationEndpoint.class.getName());
when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
ReplicationSourceManager manager = mock(ReplicationSourceManager.class);
when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
Mockito.when(manager.getGlobalMetrics())
.thenReturn(mock(MetricsReplicationGlobalSourceSource.class));
String queueId = "qid";
@ -173,7 +171,6 @@ public class TestReplicationSource {
.thenReturn(DoNothingReplicationEndpoint.class.getName());
when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
ReplicationSourceManager manager = mock(ReplicationSourceManager.class);
when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
String queueId = "qid";
RegionServerServices rss =
TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
@ -260,7 +257,6 @@ public class TestReplicationSource {
Configuration testConf = HBaseConfiguration.create();
testConf.setInt("replication.source.maxretriesmultiplier", 1);
ReplicationSourceManager manager = mock(ReplicationSourceManager.class);
when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
source.init(testConf, null, manager, null, mockPeer, null, "testPeer", null,
p -> OptionalLong.empty(), null);
ExecutorService executor = Executors.newSingleThreadExecutor();
@ -275,12 +271,8 @@ public class TestReplicationSource {
@Test
public void testTerminateClearsBuffer() throws Exception {
ReplicationSource source = new ReplicationSource();
ReplicationSourceManager mockManager = mock(ReplicationSourceManager.class);
MetricsReplicationGlobalSourceSource mockMetrics =
mock(MetricsReplicationGlobalSourceSource.class);
AtomicLong buffer = new AtomicLong();
Mockito.when(mockManager.getTotalBufferUsed()).thenReturn(buffer);
Mockito.when(mockManager.getGlobalMetrics()).thenReturn(mockMetrics);
ReplicationSourceManager mockManager = new ReplicationSourceManager(null, null, conf, null,
null, null, null, null, null, mock(MetricsReplicationGlobalSourceSource.class));
ReplicationPeer mockPeer = mock(ReplicationPeer.class);
Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
Configuration testConf = HBaseConfiguration.create();
@ -309,7 +301,7 @@ public class TestReplicationSource {
reader.addEntryToBatch(batch, mockEntry);
reader.entryBatchQueue.put(batch);
source.terminate("test");
assertEquals(0, source.getSourceManager().getTotalBufferUsed().get());
assertEquals(0, source.getSourceManager().getTotalBufferUsed());
}
/**
@ -528,7 +520,6 @@ public class TestReplicationSource {
when(peerConfig.getReplicationEndpointImpl()).thenReturn(endpointName);
when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
ReplicationSourceManager manager = mock(ReplicationSourceManager.class);
when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
Mockito.when(manager.getGlobalMetrics())
.thenReturn(mock(MetricsReplicationGlobalSourceSource.class));
String queueId = "qid";
@ -645,7 +636,6 @@ public class TestReplicationSource {
.thenReturn(DoNothingReplicationEndpoint.class.getName());
Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
Mockito.when(manager.getGlobalMetrics())
.thenReturn(mock(MetricsReplicationGlobalSourceSource.class));
RegionServerServices rss =

View File

@ -26,7 +26,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.NavigableMap;
import java.util.OptionalLong;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@ -155,11 +154,8 @@ public class TestWALEntryStreamCompressionReset {
when(SOURCE.getServerWALsBelongTo())
.thenReturn(ServerName.valueOf("localhost", 12345, EnvironmentEdgeManager.currentTime()));
when(SOURCE.getSourceMetrics()).thenReturn(METRICS_SOURCE);
ReplicationSourceManager rsm = mock(ReplicationSourceManager.class);
when(rsm.getTotalBufferUsed()).thenReturn(new AtomicLong());
when(rsm.getTotalBufferLimit())
.thenReturn((long) HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
when(rsm.getGlobalMetrics()).thenReturn(mock(MetricsReplicationGlobalSourceSource.class));
ReplicationSourceManager rsm = new ReplicationSourceManager(null, null, conf, null, null, null,
null, null, null, mock(MetricsReplicationGlobalSourceSource.class));
when(SOURCE.getSourceManager()).thenReturn(rsm);
LOG_QUEUE = new ReplicationSourceLogQueue(conf, METRICS_SOURCE, SOURCE);