diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 5fd000944bf..a980d77eae1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -139,8 +139,6 @@ public class ReplicationSource implements ReplicationSourceInterface { protected final ConcurrentHashMap workerThreads = new ConcurrentHashMap<>(); - private AtomicLong totalBufferUsed; - public static final String WAIT_ON_ENDPOINT_SECONDS = "hbase.replication.wait.on.endpoint.seconds"; public static final int DEFAULT_WAIT_ON_ENDPOINT_SECONDS = 30; @@ -224,7 +222,6 @@ public class ReplicationSource implements ReplicationSourceInterface { defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0); currentBandwidth = getCurrentBandwidth(); this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0); - this.totalBufferUsed = manager.getTotalBufferUsed(); this.walFileLengthProvider = walFileLengthProvider; this.abortOnError = this.conf.getBoolean("replication.source.regionserver.abort", true); @@ -797,14 +794,12 @@ public class ReplicationSource implements ReplicationSourceInterface { @Override // offsets totalBufferUsed by deducting shipped batchSize. - public void postShipEdits(List entries, int batchSize) { + public void postShipEdits(List entries, long batchSize) { if (throttler.isEnabled()) { throttler.addPushSize(batchSize); } totalReplicatedEdits.addAndGet(entries.size()); - long newBufferUsed = totalBufferUsed.addAndGet(-batchSize); - // Record the new buffer usage - this.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed); + this.manager.releaseBufferQuota(batchSize); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java index 9af7ea932c3..97d2d01b9e7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java @@ -149,7 +149,7 @@ public interface ReplicationSourceInterface { * @param entries pushed * @param batchSize entries size pushed */ - void postShipEdits(List entries, int batchSize); + void postShipEdits(List entries, long batchSize); /** * The queue of WALs only belong to one region server. This will return the server name which all diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index cbda89ede93..35c217940ee 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.yetus.audience.InterfaceAudience; @@ -783,8 +784,8 @@ public class ReplicationSourceManager { } } - public AtomicLong getTotalBufferUsed() { - return totalBufferUsed; + public long getTotalBufferUsed() { + return totalBufferUsed.get(); } /** @@ -834,7 +835,7 @@ public class ReplicationSourceManager { StringBuilder stats = new StringBuilder(); // Print stats that apply across all Replication Sources stats.append("Global stats: "); - stats.append("WAL Edits Buffer Used=").append(getTotalBufferUsed().get()).append("B, Limit=") + stats.append("WAL Edits Buffer Used=").append(getTotalBufferUsed()).append("B, Limit=") .append(getTotalBufferLimit()).append("B\n"); for (ReplicationSourceInterface source : this.sources.values()) { stats.append("Normal source for cluster " + source.getPeerId() + ": "); @@ -942,4 +943,80 @@ public class ReplicationSourceManager { ReplicationQueueStorage getQueueStorage() { return queueStorage; } + + /** + * Acquire the buffer quota for {@link Entry} which is added to {@link WALEntryBatch}. + * @param entry the wal entry which is added to {@link WALEntryBatch} and should acquire buffer + * quota. + * @return true if we should clear buffer and push all + */ + boolean acquireWALEntryBufferQuota(WALEntryBatch walEntryBatch, Entry entry) { + long entrySize = walEntryBatch.incrementUsedBufferSize(entry); + return this.acquireBufferQuota(entrySize); + } + + /** + * To release the buffer quota of {@link WALEntryBatch} which acquired by + * {@link ReplicationSourceManager#acquireWALEntryBufferQuota}. + * @return the released buffer quota size. + */ + long releaseWALEntryBatchBufferQuota(WALEntryBatch walEntryBatch) { + long usedBufferSize = walEntryBatch.getUsedBufferSize(); + if (usedBufferSize > 0) { + this.releaseBufferQuota(usedBufferSize); + } + return usedBufferSize; + } + + /** + * Add the size to {@link ReplicationSourceManager#totalBufferUsed} and check if it exceeds + * {@link ReplicationSourceManager#totalBufferLimit}. + * @return true if {@link ReplicationSourceManager#totalBufferUsed} exceeds + * {@link ReplicationSourceManager#totalBufferLimit},we should stop increase buffer and + * ship all. + */ + boolean acquireBufferQuota(long size) { + if (size < 0) { + throw new IllegalArgumentException("size should not less than 0"); + } + long newBufferUsed = addTotalBufferUsed(size); + return newBufferUsed >= totalBufferLimit; + } + + /** + * To release the buffer quota which acquired by + * {@link ReplicationSourceManager#acquireBufferQuota}. + */ + void releaseBufferQuota(long size) { + if (size < 0) { + throw new IllegalArgumentException("size should not less than 0"); + } + addTotalBufferUsed(-size); + } + + private long addTotalBufferUsed(long size) { + if (size == 0) { + return totalBufferUsed.get(); + } + long newBufferUsed = totalBufferUsed.addAndGet(size); + // Record the new buffer usage + this.globalMetrics.setWALReaderEditsBufferBytes(newBufferUsed); + return newBufferUsed; + } + + /** + * Check if {@link ReplicationSourceManager#totalBufferUsed} exceeds + * {@link ReplicationSourceManager#totalBufferLimit} for peer. + * @return true if {@link ReplicationSourceManager#totalBufferUsed} not more than + * {@link ReplicationSourceManager#totalBufferLimit}. + */ + boolean checkBufferQuota(String peerId) { + // try not to go over total quota + if (totalBufferUsed.get() > totalBufferLimit) { + LOG.warn("peer={}, can't read more edits from WAL as buffer usage {}B exceeds limit {}B", + peerId, totalBufferUsed.get(), totalBufferLimit); + return false; + } + return true; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index 8e5b2e5a1d3..50dbaca7ff6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -21,7 +21,6 @@ import static org.apache.hadoop.hbase.replication.ReplicationUtils.getAdaptiveTi import java.io.IOException; import java.util.List; -import java.util.concurrent.atomic.LongAccumulator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; @@ -142,18 +141,6 @@ public class ReplicationSourceShipper extends Thread { protected void postFinish() { } - /** - * get batchEntry size excludes bulk load file sizes. Uses ReplicationSourceWALReader's static - * method. - */ - private int getBatchEntrySizeExcludeBulkLoad(WALEntryBatch entryBatch) { - int totalSize = 0; - for (Entry entry : entryBatch.getWalEntries()) { - totalSize += ReplicationSourceWALReader.getEntrySizeExcludeBulkLoad(entry); - } - return totalSize; - } - /** * Do the shipping logic */ @@ -165,7 +152,6 @@ public class ReplicationSourceShipper extends Thread { return; } int currentSize = (int) entryBatch.getHeapSize(); - int sizeExcludeBulkLoad = getBatchEntrySizeExcludeBulkLoad(entryBatch); source.getSourceMetrics() .setTimeStampNextToReplicate(entries.get(entries.size() - 1).getKey().getWriteTime()); while (isActive()) { @@ -209,7 +195,7 @@ public class ReplicationSourceShipper extends Thread { // this sizeExcludeBulkLoad has to use same calculation that when calling // acquireBufferQuota() in ReplicationSourceWALReader because they maintain // same variable: totalBufferUsed - source.postShipEdits(entries, sizeExcludeBulkLoad); + source.postShipEdits(entries, entryBatch.getUsedBufferSize()); // FIXME check relationship between wal group and overall source.getSourceMetrics().shipBatch(entryBatch.getNbOperations(), currentSize, entryBatch.getNbHFiles()); @@ -372,20 +358,17 @@ public class ReplicationSourceShipper extends Thread { return; } } - LongAccumulator totalToDecrement = new LongAccumulator((a, b) -> a + b, 0); - entryReader.entryBatchQueue.forEach(w -> { - entryReader.entryBatchQueue.remove(w); - w.getWalEntries().forEach(e -> { - long entrySizeExcludeBulkLoad = ReplicationSourceWALReader.getEntrySizeExcludeBulkLoad(e); - totalToDecrement.accumulate(entrySizeExcludeBulkLoad); - }); - }); + long totalReleasedBytes = 0; + while (true) { + WALEntryBatch batch = entryReader.entryBatchQueue.poll(); + if (batch == null) { + break; + } + totalReleasedBytes += source.getSourceManager().releaseWALEntryBatchBufferQuota(batch); + } if (LOG.isTraceEnabled()) { LOG.trace("Decrementing totalBufferUsed by {}B while stopping Replication WAL Readers.", - totalToDecrement.longValue()); + totalReleasedBytes); } - long newBufferUsed = - source.getSourceManager().getTotalBufferUsed().addAndGet(-totalToDecrement.longValue()); - source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index d52ed86b2ff..bd5b7736f3b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -23,7 +23,6 @@ import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -35,7 +34,6 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALEdit; -import org.apache.hadoop.hbase.wal.WALKey; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; import org.slf4j.Logger; @@ -75,9 +73,6 @@ class ReplicationSourceWALReader extends Thread { // Indicates whether this particular worker is running private boolean isReaderRunning = true; - - private AtomicLong totalBufferUsed; - private long totalBufferQuota; private final String walGroupId; /** @@ -105,8 +100,6 @@ class ReplicationSourceWALReader extends Thread { // memory used will be batchSizeCapacity * (nb.batches + 1) // the +1 is for the current thread reading before placing onto the queue int batchCount = conf.getInt("replication.source.nb.batches", 1); - this.totalBufferUsed = source.getSourceManager().getTotalBufferUsed(); - this.totalBufferQuota = source.getSourceManager().getTotalBufferLimit(); // 1 second this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); // 5 minutes @ 1 sec per @@ -147,7 +140,7 @@ class ReplicationSourceWALReader extends Thread { Threads.sleep(sleepForRetries); continue; } - if (!checkQuota()) { + if (!checkBufferQuota()) { continue; } Path currentPath = entryStream.getCurrentPath(); @@ -188,7 +181,7 @@ class ReplicationSourceWALReader extends Thread { // batch is not put to ReplicationSourceWALReader#entryBatchQueue,so we should // decrease ReplicationSourceWALReader.totalBufferUsed by the byte size which // acquired in ReplicationSourceWALReader.acquireBufferQuota. - this.releaseBufferQuota(batch); + this.getSourceManager().releaseWALEntryBatchBufferQuota(batch); } } } @@ -218,10 +211,9 @@ class ReplicationSourceWALReader extends Thread { entry.getKey().getWriteTime(), entry.getKey(), this.source.getQueueId()); updateReplicationMarkerEdit(entry, batch.getLastWalPosition()); long entrySize = getEntrySizeIncludeBulkLoad(entry); - long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry); batch.addEntry(entry, entrySize); updateBatchStats(batch, entry, entrySize); - boolean totalBufferTooLarge = acquireBufferQuota(batch, entrySizeExcludeBulkLoad); + boolean totalBufferTooLarge = this.getSourceManager().acquireWALEntryBufferQuota(batch, entry); // Stop if too many entries or too big return totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity @@ -275,11 +267,9 @@ class ReplicationSourceWALReader extends Thread { } // returns false if we've already exceeded the global quota - private boolean checkQuota() { + private boolean checkBufferQuota() { // try not to go over total quota - if (totalBufferUsed.get() > totalBufferQuota) { - LOG.warn("peer={}, can't read more edits from WAL as buffer usage {}B exceeds limit {}B", - this.source.getPeerId(), totalBufferUsed.get(), totalBufferQuota); + if (!this.getSourceManager().checkBufferQuota(this.source.getPeerId())) { Threads.sleep(sleepForRetries); return false; } @@ -319,13 +309,7 @@ class ReplicationSourceWALReader extends Thread { private long getEntrySizeIncludeBulkLoad(Entry entry) { WALEdit edit = entry.getEdit(); - return getEntrySizeExcludeBulkLoad(entry) + sizeOfStoreFilesIncludeBulkLoad(edit); - } - - public static long getEntrySizeExcludeBulkLoad(Entry entry) { - WALEdit edit = entry.getEdit(); - WALKey key = entry.getKey(); - return edit.heapSize() + key.estimatedSerializedSizeOf(); + return WALEntryBatch.getEntrySizeExcludeBulkLoad(entry) + sizeOfStoreFilesIncludeBulkLoad(edit); } private void updateBatchStats(WALEntryBatch batch, Entry entry, long entrySize) { @@ -435,30 +419,6 @@ class ReplicationSourceWALReader extends Thread { edit.setCells(newCells); } - /** - * @param size delta size for grown buffer - * @return true if we should clear buffer and push all - */ - private boolean acquireBufferQuota(WALEntryBatch walEntryBatch, long size) { - long newBufferUsed = totalBufferUsed.addAndGet(size); - // Record the new buffer usage - this.source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed); - walEntryBatch.incrementUsedBufferSize(size); - return newBufferUsed >= totalBufferQuota; - } - - /** - * To release the buffer quota of {@link WALEntryBatch} which acquired by - * {@link ReplicationSourceWALReader#acquireBufferQuota} - */ - private void releaseBufferQuota(WALEntryBatch walEntryBatch) { - long usedBufferSize = walEntryBatch.getUsedBufferSize(); - if (usedBufferSize > 0) { - long newBufferUsed = totalBufferUsed.addAndGet(-usedBufferSize); - this.source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed); - } - } - /** Returns whether the reader thread is running */ public boolean isReaderRunning() { return isReaderRunning && !isInterrupted(); @@ -470,4 +430,8 @@ class ReplicationSourceWALReader extends Thread { public void setReaderRunning(boolean readerRunning) { this.isReaderRunning = readerRunning; } + + private ReplicationSourceManager getSourceManager() { + return this.source.getSourceManager(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationThrottler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationThrottler.java index 3e4bb77b23f..e39082c8e50 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationThrottler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationThrottler.java @@ -100,7 +100,7 @@ public class ReplicationThrottler { * Add current size to the current cycle's total push size * @param size is the current size added to the current cycle's total push size */ - public void addPushSize(final int size) { + public void addPushSize(final long size) { if (this.enabled) { this.cyclePushSize += size; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java index 32a149db9cd..9d791f39d40 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java @@ -25,6 +25,8 @@ import java.util.stream.Collectors; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALKey; import org.apache.yetus.audience.InterfaceAudience; /** @@ -156,8 +158,10 @@ class WALEntryBatch { lastSeqIds.put(region, sequenceId); } - public void incrementUsedBufferSize(long increment) { + public long incrementUsedBufferSize(Entry entry) { + long increment = getEntrySizeExcludeBulkLoad(entry); usedBufferSize += increment; + return increment; } public long getUsedBufferSize() { @@ -171,4 +175,10 @@ class WALEntryBatch { + nbHFiles + ", heapSize=" + heapSize + ", lastSeqIds=" + lastSeqIds + ", endOfFile=" + endOfFile + ",usedBufferSize=" + usedBufferSize + "]"; } + + static long getEntrySizeExcludeBulkLoad(Entry entry) { + WALEdit edit = entry.getEdit(); + WALKey key = entry.getKey(); + return edit.heapSize() + key.estimatedSerializedSizeOf(); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java index ac553de178d..00d8821899b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java @@ -149,7 +149,7 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { } @Override - public void postShipEdits(List entries, int batchSize) { + public void postShipEdits(List entries, long batchSize) { } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java index 01f0659de58..2906c922abc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java @@ -294,11 +294,10 @@ public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase { Assert.assertEquals(key.getReplicationScopes(), deserializedKey.getReplicationScopes()); } - private ReplicationSource mockReplicationSource(boolean recovered, Configuration conf) { - ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class); - when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); - when(mockSourceManager.getTotalBufferLimit()) - .thenReturn((long) HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT); + private ReplicationSource mockReplicationSource(boolean recovered, Configuration conf) + throws IOException { + ReplicationSourceManager mockSourceManager = new ReplicationSourceManager(null, null, conf, + null, null, null, null, null, null, createMockGlobalMetrics()); Server mockServer = Mockito.mock(Server.class); ReplicationSource source = Mockito.mock(ReplicationSource.class); when(source.getSourceManager()).thenReturn(mockSourceManager); @@ -306,6 +305,10 @@ public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase { when(source.getWALFileLengthProvider()).thenReturn(log); when(source.getServer()).thenReturn(mockServer); when(source.isRecovered()).thenReturn(recovered); + return source; + } + + private MetricsReplicationGlobalSourceSource createMockGlobalMetrics() { MetricsReplicationGlobalSourceSource globalMetrics = Mockito.mock(MetricsReplicationGlobalSourceSource.class); final AtomicLong bufferUsedCounter = new AtomicLong(0); @@ -315,12 +318,11 @@ public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase { }).when(globalMetrics).setWALReaderEditsBufferBytes(Mockito.anyLong()); when(globalMetrics.getWALReaderEditsBufferBytes()) .then(invocationOnMock -> bufferUsedCounter.get()); - - when(mockSourceManager.getGlobalMetrics()).thenReturn(globalMetrics); - return source; + return globalMetrics; } - private ReplicationSourceWALReader createReader(boolean recovered, Configuration conf) { + private ReplicationSourceWALReader createReader(boolean recovered, Configuration conf) + throws IOException { ReplicationSource source = mockReplicationSource(recovered, conf); when(source.isPeerEnabled()).thenReturn(true); ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, conf, logQueue, 0, @@ -330,7 +332,7 @@ public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase { } private ReplicationSourceWALReader createReaderWithBadReplicationFilter(int numFailures, - Configuration conf) { + Configuration conf) throws IOException { ReplicationSource source = mockReplicationSource(false, conf); when(source.isPeerEnabled()).thenReturn(true); ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, conf, logQueue, 0, @@ -667,12 +669,7 @@ public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase { appendEntries(writer1, 3); localLogQueue.enqueueLog(log1, fakeWalGroupId); - ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class); - // Make it look like the source is from recovered source. - when(mockSourceManager.getOldSources()) - .thenReturn(new ArrayList<>(Arrays.asList((ReplicationSourceInterface) source))); when(source.isPeerEnabled()).thenReturn(true); - when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); // Override the max retries multiplier to fail fast. conf.setInt("replication.source.maxretriesmultiplier", 1); conf.setBoolean("replication.source.eof.autorecovery", true); @@ -784,10 +781,8 @@ public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase { // make sure the size of the wal file is 0. assertEquals(0, fs.getFileStatus(archivePath).getLen()); - ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class); ReplicationSource source = Mockito.mock(ReplicationSource.class); when(source.isPeerEnabled()).thenReturn(true); - when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); Configuration localConf = new Configuration(CONF); localConf.setInt("replication.source.maxretriesmultiplier", 1); @@ -834,11 +829,11 @@ public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase { assertNotNull(entryBatch); assertEquals(3, entryBatch.getWalEntries().size()); long sum = entryBatch.getWalEntries().stream() - .mapToLong(ReplicationSourceWALReader::getEntrySizeExcludeBulkLoad).sum(); + .mapToLong(WALEntryBatch::getEntrySizeExcludeBulkLoad).sum(); assertEquals(position, entryBatch.getLastWalPosition()); assertEquals(walPath, entryBatch.getLastWalPath()); assertEquals(3, entryBatch.getNbRowKeys()); - assertEquals(sum, source.getSourceManager().getTotalBufferUsed().get()); + assertEquals(sum, source.getSourceManager().getTotalBufferUsed()); assertEquals(sum, source.getSourceManager().getGlobalMetrics().getWALReaderEditsBufferBytes()); assertEquals(maxThrowExceptionCount, walEntryFilter.getThrowExceptionCount()); assertNull(reader.poll(10)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalReplicationThrottler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalReplicationThrottler.java index 5f4ceaa60da..698c19a80e8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalReplicationThrottler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalReplicationThrottler.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; -import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -133,10 +132,9 @@ public class TestGlobalReplicationThrottler { Thread watcher = new Thread(() -> { Replication replication = (Replication) utility1.getMiniHBaseCluster().getRegionServer(0) .getReplicationSourceService(); - AtomicLong bufferUsed = replication.getReplicationManager().getTotalBufferUsed(); testQuotaPass = true; while (!Thread.interrupted()) { - long size = bufferUsed.get(); + long size = replication.getReplicationManager().getTotalBufferUsed(); if (size > 0) { testQuotaNonZero = true; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java index 35eb343dbff..b12c8eb7389 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java @@ -34,7 +34,6 @@ import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -133,7 +132,6 @@ public class TestReplicationSource { .thenReturn(DoNothingReplicationEndpoint.class.getName()); when(mockPeer.getPeerConfig()).thenReturn(peerConfig); ReplicationSourceManager manager = mock(ReplicationSourceManager.class); - when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); Mockito.when(manager.getGlobalMetrics()) .thenReturn(mock(MetricsReplicationGlobalSourceSource.class)); String queueId = "qid"; @@ -173,7 +171,6 @@ public class TestReplicationSource { .thenReturn(DoNothingReplicationEndpoint.class.getName()); when(mockPeer.getPeerConfig()).thenReturn(peerConfig); ReplicationSourceManager manager = mock(ReplicationSourceManager.class); - when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); String queueId = "qid"; RegionServerServices rss = TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); @@ -260,7 +257,6 @@ public class TestReplicationSource { Configuration testConf = HBaseConfiguration.create(); testConf.setInt("replication.source.maxretriesmultiplier", 1); ReplicationSourceManager manager = mock(ReplicationSourceManager.class); - when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); source.init(testConf, null, manager, null, mockPeer, null, "testPeer", null, p -> OptionalLong.empty(), null); ExecutorService executor = Executors.newSingleThreadExecutor(); @@ -275,12 +271,8 @@ public class TestReplicationSource { @Test public void testTerminateClearsBuffer() throws Exception { ReplicationSource source = new ReplicationSource(); - ReplicationSourceManager mockManager = mock(ReplicationSourceManager.class); - MetricsReplicationGlobalSourceSource mockMetrics = - mock(MetricsReplicationGlobalSourceSource.class); - AtomicLong buffer = new AtomicLong(); - Mockito.when(mockManager.getTotalBufferUsed()).thenReturn(buffer); - Mockito.when(mockManager.getGlobalMetrics()).thenReturn(mockMetrics); + ReplicationSourceManager mockManager = new ReplicationSourceManager(null, null, conf, null, + null, null, null, null, null, mock(MetricsReplicationGlobalSourceSource.class)); ReplicationPeer mockPeer = mock(ReplicationPeer.class); Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); Configuration testConf = HBaseConfiguration.create(); @@ -309,7 +301,7 @@ public class TestReplicationSource { reader.addEntryToBatch(batch, mockEntry); reader.entryBatchQueue.put(batch); source.terminate("test"); - assertEquals(0, source.getSourceManager().getTotalBufferUsed().get()); + assertEquals(0, source.getSourceManager().getTotalBufferUsed()); } /** @@ -528,7 +520,6 @@ public class TestReplicationSource { when(peerConfig.getReplicationEndpointImpl()).thenReturn(endpointName); when(mockPeer.getPeerConfig()).thenReturn(peerConfig); ReplicationSourceManager manager = mock(ReplicationSourceManager.class); - when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); Mockito.when(manager.getGlobalMetrics()) .thenReturn(mock(MetricsReplicationGlobalSourceSource.class)); String queueId = "qid"; @@ -645,7 +636,6 @@ public class TestReplicationSource { .thenReturn(DoNothingReplicationEndpoint.class.getName()); Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig); ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); - Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); Mockito.when(manager.getGlobalMetrics()) .thenReturn(mock(MetricsReplicationGlobalSourceSource.class)); RegionServerServices rss = diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStreamCompressionReset.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStreamCompressionReset.java index dda72f4304a..b687aaeee6c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStreamCompressionReset.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStreamCompressionReset.java @@ -26,7 +26,6 @@ import java.util.ArrayList; import java.util.List; import java.util.NavigableMap; import java.util.OptionalLong; -import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -155,11 +154,8 @@ public class TestWALEntryStreamCompressionReset { when(SOURCE.getServerWALsBelongTo()) .thenReturn(ServerName.valueOf("localhost", 12345, EnvironmentEdgeManager.currentTime())); when(SOURCE.getSourceMetrics()).thenReturn(METRICS_SOURCE); - ReplicationSourceManager rsm = mock(ReplicationSourceManager.class); - when(rsm.getTotalBufferUsed()).thenReturn(new AtomicLong()); - when(rsm.getTotalBufferLimit()) - .thenReturn((long) HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT); - when(rsm.getGlobalMetrics()).thenReturn(mock(MetricsReplicationGlobalSourceSource.class)); + ReplicationSourceManager rsm = new ReplicationSourceManager(null, null, conf, null, null, null, + null, null, null, mock(MetricsReplicationGlobalSourceSource.class)); when(SOURCE.getSourceManager()).thenReturn(rsm); LOG_QUEUE = new ReplicationSourceLogQueue(conf, METRICS_SOURCE, SOURCE);