diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 4051efe1225..d21d83c06ba 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -598,6 +598,7 @@ public class ReplicationSource implements ReplicationSourceInterface { } @Override + //offsets totalBufferUsed by deducting shipped batchSize. public void postShipEdits(List entries, int batchSize) { if (throttler.isEnabled()) { throttler.addPushSize(batchSize); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index 11fd660c2b6..123ecbea5c9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -123,6 +123,18 @@ public class ReplicationSourceShipper extends Thread { protected void postFinish() { } + /** + * get batchEntry size excludes bulk load file sizes. + * Uses ReplicationSourceWALReader's static method. + */ + private int getBatchEntrySizeExcludeBulkLoad(WALEntryBatch entryBatch) { + int totalSize = 0; + for(Entry entry : entryBatch.getWalEntries()) { + totalSize += entryReader.getEntrySizeExcludeBulkLoad(entry); + } + return totalSize; + } + /** * Do the shipping logic */ @@ -139,6 +151,7 @@ public class ReplicationSourceShipper extends Thread { return; } int currentSize = (int) entryBatch.getHeapSize(); + int sizeExcludeBulkLoad = getBatchEntrySizeExcludeBulkLoad(entryBatch); while (isActive()) { try { try { @@ -175,7 +188,11 @@ public class ReplicationSourceShipper extends Thread { // Log and clean up WAL logs updateLogPosition(entryBatch); - source.postShipEdits(entries, currentSize); + //offsets totalBufferUsed by deducting shipped batchSize (excludes bulk load size) + //this sizeExcludeBulkLoad has to use same calculation that when calling + //acquireBufferQuota() in ReplicatinoSourceWALReader because they maintain + //same variable: totalBufferUsed + source.postShipEdits(entries, sizeExcludeBulkLoad); // FIXME check relationship between wal group and overall source.getSourceMetrics().shipBatch(entryBatch.getNbOperations(), currentSize, entryBatch.getNbHFiles()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index 64fd48df900..f685a9b2998 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALKey; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; import org.slf4j.Logger; @@ -168,10 +169,12 @@ class ReplicationSourceWALReader extends Thread { if (edit == null || edit.isEmpty()) { return false; } - long entrySize = getEntrySize(entry); + long entrySize = getEntrySizeIncludeBulkLoad(entry); + long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry); batch.addEntry(entry); updateBatchStats(batch, entry, entrySize); - boolean totalBufferTooLarge = acquireBufferQuota(entrySize); + boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExcludeBulkLoad); + // Stop if too many entries or too big return totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity || batch.getNbEntries() >= replicationBatchCountCapacity; @@ -296,11 +299,20 @@ class ReplicationSourceWALReader extends Thread { return entryBatchQueue.take(); } - private long getEntrySize(Entry entry) { + private long getEntrySizeIncludeBulkLoad(Entry entry) { WALEdit edit = entry.getEdit(); - return edit.heapSize() + calculateTotalSizeOfStoreFiles(edit); + WALKey key = entry.getKey(); + return edit.heapSize() + sizeOfStoreFilesIncludeBulkLoad(edit) + + key.estimatedSerializedSizeOf(); } + public static long getEntrySizeExcludeBulkLoad(Entry entry) { + WALEdit edit = entry.getEdit(); + WALKey key = entry.getKey(); + return edit.heapSize() + key.estimatedSerializedSizeOf(); + } + + private void updateBatchStats(WALEntryBatch batch, Entry entry, long entrySize) { WALEdit edit = entry.getEdit(); batch.incrementHeapSize(entrySize); @@ -353,7 +365,7 @@ class ReplicationSourceWALReader extends Thread { * @param edit edit to count row keys from * @return the total size of the store files */ - private int calculateTotalSizeOfStoreFiles(WALEdit edit) { + private int sizeOfStoreFilesIncludeBulkLoad(WALEdit edit) { List cells = edit.getCells(); int totalStoreFilesSize = 0; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalThrottler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalThrottler.java index d3b4e8e430c..fecce029092 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalThrottler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalThrottler.java @@ -64,6 +64,8 @@ public class TestGlobalThrottler { HBaseClassTestRule.forClass(TestGlobalThrottler.class); private static final Logger LOG = LoggerFactory.getLogger(TestGlobalThrottler.class); + private static final int REPLICATION_SOURCE_QUOTA = 200; + private static int numOfPeer = 0; private static Configuration conf1; private static Configuration conf2; @@ -84,7 +86,7 @@ public class TestGlobalThrottler { conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); conf1.setLong("replication.source.sleepforretries", 100); // Each WAL is about 120 bytes - conf1.setInt(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, 200); + conf1.setInt(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, REPLICATION_SOURCE_QUOTA); conf1.setLong("replication.source.per.peer.node.bandwidth", 100L); utility1 = new HBaseTestingUtility(conf1); @@ -109,6 +111,7 @@ public class TestGlobalThrottler { admin1.addPeer("peer1", rpc, null); admin1.addPeer("peer2", rpc, null); admin1.addPeer("peer3", rpc, null); + numOfPeer = admin1.getPeersCount(); } @AfterClass @@ -140,7 +143,10 @@ public class TestGlobalThrottler { if (size > 0) { testQuotaNonZero = true; } - if (size > 600) { + //the reason here doing "numOfPeer + 1" is because by using method addEntryToBatch(), even the + // batch size (after added last entry) exceeds quota, it still keeps the last one in the batch + // so total used buffer size can be one "replication.total.buffer.quota" larger than expected + if (size > REPLICATION_SOURCE_QUOTA * (numOfPeer + 1)) { // We read logs first then check throttler, so if the buffer quota limiter doesn't // take effect, it will push many logs and exceed the quota. testQuotaPass = false; @@ -181,13 +187,5 @@ public class TestGlobalThrottler { Assert.assertTrue(testQuotaNonZero); } - private List getRowNumbers(List cells) { - List listOfRowNumbers = new ArrayList<>(cells.size()); - for (Cell c : cells) { - listOfRowNumbers.add(Integer.parseInt(Bytes - .toString(c.getRowArray(), c.getRowOffset() + ROW.length, - c.getRowLength() - ROW.length))); - } - return listOfRowNumbers; - } + }