diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index 64fd48df900..ee549318bfe 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALKey; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; import org.slf4j.Logger; @@ -168,10 +169,12 @@ class ReplicationSourceWALReader extends Thread { if (edit == null || edit.isEmpty()) { return false; } - long entrySize = getEntrySize(entry); + long entrySize = getEntrySizeIncludeBulkLoad(entry); + long entrySizeExlucdeBulkLoad = getEntrySizeExcludeBulkLoad(entry); batch.addEntry(entry); updateBatchStats(batch, entry, entrySize); - boolean totalBufferTooLarge = acquireBufferQuota(entrySize); + boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExlucdeBulkLoad); + // Stop if too many entries or too big return totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity || batch.getNbEntries() >= replicationBatchCountCapacity; @@ -296,11 +299,20 @@ class ReplicationSourceWALReader extends Thread { return entryBatchQueue.take(); } - private long getEntrySize(Entry entry) { + private long getEntrySizeIncludeBulkLoad(Entry entry) { WALEdit edit = entry.getEdit(); - return edit.heapSize() + calculateTotalSizeOfStoreFiles(edit); + WALKey key = entry.getKey(); + return edit.heapSize() + sizeOfStoreFilesIncludeBulkLoad(edit) + + key.estimatedSerializedSizeOf(); } + private long getEntrySizeExcludeBulkLoad(Entry entry) { + WALEdit edit = entry.getEdit(); + WALKey key = entry.getKey(); + return edit.heapSize() + key.estimatedSerializedSizeOf(); + } + + private void updateBatchStats(WALEntryBatch batch, Entry entry, long entrySize) { WALEdit edit = entry.getEdit(); batch.incrementHeapSize(entrySize); @@ -353,7 +365,7 @@ class ReplicationSourceWALReader extends Thread { * @param edit edit to count row keys from * @return the total size of the store files */ - private int calculateTotalSizeOfStoreFiles(WALEdit edit) { + private int sizeOfStoreFilesIncludeBulkLoad(WALEdit edit) { List cells = edit.getCells(); int totalStoreFilesSize = 0;