Revert "HBASE-18116 fix replication source in-memory calculation by excluding bulk load file"

This reverts commit dcb5b5a012.
This commit is contained in:
Andrew Purtell 2018-05-31 15:28:50 -07:00
parent dcb5b5a012
commit 08b7ed1656
1 changed files with 7 additions and 15 deletions

View File

@ -39,16 +39,17 @@ import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream.WALEntryStreamRuntimeException; import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream.WALEntryStreamRuntimeException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALKey;
/** /**
* Reads and filters WAL entries, groups the filtered entries into batches, and puts the batches onto a queue * Reads and filters WAL entries, groups the filtered entries into batches, and puts the batches onto a queue
@ -144,11 +145,10 @@ public class ReplicationSourceWALReaderThread extends Thread {
if (entry != null) { if (entry != null) {
WALEdit edit = entry.getEdit(); WALEdit edit = entry.getEdit();
if (edit != null && !edit.isEmpty()) { if (edit != null && !edit.isEmpty()) {
long entrySize = getEntrySizeIncludeBulkLoad(entry); long entrySize = getEntrySize(entry);
long entrySizeExlucdeBulkLoad = getEntrySizeExcludeBulkLoad(entry);
batch.addEntry(entry); batch.addEntry(entry);
updateBatchStats(batch, entry, entryStream.getPosition(), entrySize); updateBatchStats(batch, entry, entryStream.getPosition(), entrySize);
boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExlucdeBulkLoad); boolean totalBufferTooLarge = acquireBufferQuota(entrySize);
// Stop if too many entries or too big // Stop if too many entries or too big
if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity
|| batch.getNbEntries() >= replicationBatchCountCapacity) { || batch.getNbEntries() >= replicationBatchCountCapacity) {
@ -251,17 +251,9 @@ public class ReplicationSourceWALReaderThread extends Thread {
return entryBatchQueue.take(); return entryBatchQueue.take();
} }
private long getEntrySizeIncludeBulkLoad(Entry entry) { private long getEntrySize(Entry entry) {
WALEdit edit = entry.getEdit(); WALEdit edit = entry.getEdit();
WALKey key = entry.getKey(); return edit.heapSize() + calculateTotalSizeOfStoreFiles(edit);
return edit.heapSize() + sizeOfStoreFilesIncludeBulkLoad(edit) +
key.estimatedSerializedSizeOf();
}
private long getEntrySizeExcludeBulkLoad(Entry entry) {
WALEdit edit = entry.getEdit();
WALKey key = entry.getKey();
return edit.heapSize() + key.estimatedSerializedSizeOf();
} }
private void updateBatchStats(WALEntryBatch batch, Entry entry, long entryPosition, long entrySize) { private void updateBatchStats(WALEntryBatch batch, Entry entry, long entryPosition, long entrySize) {
@ -319,7 +311,7 @@ public class ReplicationSourceWALReaderThread extends Thread {
* @param edit edit to count row keys from * @param edit edit to count row keys from
* @return the total size of the store files * @return the total size of the store files
*/ */
private int sizeOfStoreFilesIncludeBulkLoad(WALEdit edit) { private int calculateTotalSizeOfStoreFiles(WALEdit edit) {
List<Cell> cells = edit.getCells(); List<Cell> cells = edit.getCells();
int totalStoreFilesSize = 0; int totalStoreFilesSize = 0;