HBASE-18116 fix replication source in-memory calculation by excluding bulk load file
Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
parent
fc9743c17a
commit
050fae501a
|
@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.util.Pair;
|
|||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.wal.WALKey;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.yetus.audience.InterfaceStability;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -168,10 +169,12 @@ class ReplicationSourceWALReader extends Thread {
|
|||
if (edit == null || edit.isEmpty()) {
|
||||
return false;
|
||||
}
|
||||
long entrySize = getEntrySize(entry);
|
||||
long entrySize = getEntrySizeIncludeBulkLoad(entry);
|
||||
long entrySizeExlucdeBulkLoad = getEntrySizeExcludeBulkLoad(entry);
|
||||
batch.addEntry(entry);
|
||||
updateBatchStats(batch, entry, entrySize);
|
||||
boolean totalBufferTooLarge = acquireBufferQuota(entrySize);
|
||||
boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExlucdeBulkLoad);
|
||||
|
||||
// Stop if too many entries or too big
|
||||
return totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity ||
|
||||
batch.getNbEntries() >= replicationBatchCountCapacity;
|
||||
|
@ -296,11 +299,20 @@ class ReplicationSourceWALReader extends Thread {
|
|||
return entryBatchQueue.take();
|
||||
}
|
||||
|
||||
private long getEntrySize(Entry entry) {
|
||||
private long getEntrySizeIncludeBulkLoad(Entry entry) {
|
||||
WALEdit edit = entry.getEdit();
|
||||
return edit.heapSize() + calculateTotalSizeOfStoreFiles(edit);
|
||||
WALKey key = entry.getKey();
|
||||
return edit.heapSize() + sizeOfStoreFilesIncludeBulkLoad(edit) +
|
||||
key.estimatedSerializedSizeOf();
|
||||
}
|
||||
|
||||
private long getEntrySizeExcludeBulkLoad(Entry entry) {
|
||||
WALEdit edit = entry.getEdit();
|
||||
WALKey key = entry.getKey();
|
||||
return edit.heapSize() + key.estimatedSerializedSizeOf();
|
||||
}
|
||||
|
||||
|
||||
private void updateBatchStats(WALEntryBatch batch, Entry entry, long entrySize) {
|
||||
WALEdit edit = entry.getEdit();
|
||||
batch.incrementHeapSize(entrySize);
|
||||
|
@ -353,7 +365,7 @@ class ReplicationSourceWALReader extends Thread {
|
|||
* @param edit edit to count row keys from
|
||||
* @return the total size of the store files
|
||||
*/
|
||||
private int calculateTotalSizeOfStoreFiles(WALEdit edit) {
|
||||
private int sizeOfStoreFilesIncludeBulkLoad(WALEdit edit) {
|
||||
List<Cell> cells = edit.getCells();
|
||||
int totalStoreFilesSize = 0;
|
||||
|
||||
|
|
Loading…
Reference in New Issue