HBASE-18116 Replication source in-memory accounting should not include bulk transfer hfiles
Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
parent
0968668283
commit
a11701ecc5
|
@ -598,6 +598,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
|
|||
}
|
||||
|
||||
@Override
|
||||
//offsets totalBufferUsed by deducting shipped batchSize.
|
||||
public void postShipEdits(List<Entry> entries, int batchSize) {
|
||||
if (throttler.isEnabled()) {
|
||||
throttler.addPushSize(batchSize);
|
||||
|
|
|
@ -123,6 +123,18 @@ public class ReplicationSourceShipper extends Thread {
|
|||
protected void postFinish() {
|
||||
}
|
||||
|
||||
/**
|
||||
* get batchEntry size excludes bulk load file sizes.
|
||||
* Uses ReplicationSourceWALReader's static method.
|
||||
*/
|
||||
private int getBatchEntrySizeExcludeBulkLoad(WALEntryBatch entryBatch) {
|
||||
int totalSize = 0;
|
||||
for(Entry entry : entryBatch.getWalEntries()) {
|
||||
totalSize += entryReader.getEntrySizeExcludeBulkLoad(entry);
|
||||
}
|
||||
return totalSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* Do the shipping logic
|
||||
*/
|
||||
|
@ -139,6 +151,7 @@ public class ReplicationSourceShipper extends Thread {
|
|||
return;
|
||||
}
|
||||
int currentSize = (int) entryBatch.getHeapSize();
|
||||
int sizeExcludeBulkLoad = getBatchEntrySizeExcludeBulkLoad(entryBatch);
|
||||
while (isActive()) {
|
||||
try {
|
||||
try {
|
||||
|
@ -175,7 +188,11 @@ public class ReplicationSourceShipper extends Thread {
|
|||
// Log and clean up WAL logs
|
||||
updateLogPosition(entryBatch);
|
||||
|
||||
source.postShipEdits(entries, currentSize);
|
||||
//offsets totalBufferUsed by deducting shipped batchSize (excludes bulk load size)
|
||||
//this sizeExcludeBulkLoad has to use same calculation that when calling
|
||||
//acquireBufferQuota() in ReplicatinoSourceWALReader because they maintain
|
||||
//same variable: totalBufferUsed
|
||||
source.postShipEdits(entries, sizeExcludeBulkLoad);
|
||||
// FIXME check relationship between wal group and overall
|
||||
source.getSourceMetrics().shipBatch(entryBatch.getNbOperations(), currentSize,
|
||||
entryBatch.getNbHFiles());
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.util.Pair;
|
|||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.wal.WALKey;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.yetus.audience.InterfaceStability;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -168,10 +169,12 @@ class ReplicationSourceWALReader extends Thread {
|
|||
if (edit == null || edit.isEmpty()) {
|
||||
return false;
|
||||
}
|
||||
long entrySize = getEntrySize(entry);
|
||||
long entrySize = getEntrySizeIncludeBulkLoad(entry);
|
||||
long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry);
|
||||
batch.addEntry(entry);
|
||||
updateBatchStats(batch, entry, entrySize);
|
||||
boolean totalBufferTooLarge = acquireBufferQuota(entrySize);
|
||||
boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExcludeBulkLoad);
|
||||
|
||||
// Stop if too many entries or too big
|
||||
return totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity ||
|
||||
batch.getNbEntries() >= replicationBatchCountCapacity;
|
||||
|
@ -296,11 +299,20 @@ class ReplicationSourceWALReader extends Thread {
|
|||
return entryBatchQueue.take();
|
||||
}
|
||||
|
||||
private long getEntrySize(Entry entry) {
|
||||
private long getEntrySizeIncludeBulkLoad(Entry entry) {
|
||||
WALEdit edit = entry.getEdit();
|
||||
return edit.heapSize() + calculateTotalSizeOfStoreFiles(edit);
|
||||
WALKey key = entry.getKey();
|
||||
return edit.heapSize() + sizeOfStoreFilesIncludeBulkLoad(edit) +
|
||||
key.estimatedSerializedSizeOf();
|
||||
}
|
||||
|
||||
public static long getEntrySizeExcludeBulkLoad(Entry entry) {
|
||||
WALEdit edit = entry.getEdit();
|
||||
WALKey key = entry.getKey();
|
||||
return edit.heapSize() + key.estimatedSerializedSizeOf();
|
||||
}
|
||||
|
||||
|
||||
private void updateBatchStats(WALEntryBatch batch, Entry entry, long entrySize) {
|
||||
WALEdit edit = entry.getEdit();
|
||||
batch.incrementHeapSize(entrySize);
|
||||
|
@ -353,7 +365,7 @@ class ReplicationSourceWALReader extends Thread {
|
|||
* @param edit edit to count row keys from
|
||||
* @return the total size of the store files
|
||||
*/
|
||||
private int calculateTotalSizeOfStoreFiles(WALEdit edit) {
|
||||
private int sizeOfStoreFilesIncludeBulkLoad(WALEdit edit) {
|
||||
List<Cell> cells = edit.getCells();
|
||||
int totalStoreFilesSize = 0;
|
||||
|
||||
|
|
|
@ -64,6 +64,8 @@ public class TestGlobalThrottler {
|
|||
HBaseClassTestRule.forClass(TestGlobalThrottler.class);
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestGlobalThrottler.class);
|
||||
private static final int REPLICATION_SOURCE_QUOTA = 200;
|
||||
private static int numOfPeer = 0;
|
||||
private static Configuration conf1;
|
||||
private static Configuration conf2;
|
||||
|
||||
|
@ -84,7 +86,7 @@ public class TestGlobalThrottler {
|
|||
conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
|
||||
conf1.setLong("replication.source.sleepforretries", 100);
|
||||
// Each WAL is about 120 bytes
|
||||
conf1.setInt(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, 200);
|
||||
conf1.setInt(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, REPLICATION_SOURCE_QUOTA);
|
||||
conf1.setLong("replication.source.per.peer.node.bandwidth", 100L);
|
||||
|
||||
utility1 = new HBaseTestingUtility(conf1);
|
||||
|
@ -109,6 +111,7 @@ public class TestGlobalThrottler {
|
|||
admin1.addPeer("peer1", rpc, null);
|
||||
admin1.addPeer("peer2", rpc, null);
|
||||
admin1.addPeer("peer3", rpc, null);
|
||||
numOfPeer = admin1.getPeersCount();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
@ -140,7 +143,10 @@ public class TestGlobalThrottler {
|
|||
if (size > 0) {
|
||||
testQuotaNonZero = true;
|
||||
}
|
||||
if (size > 600) {
|
||||
//the reason here doing "numOfPeer + 1" is because by using method addEntryToBatch(), even the
|
||||
// batch size (after added last entry) exceeds quota, it still keeps the last one in the batch
|
||||
// so total used buffer size can be one "replication.total.buffer.quota" larger than expected
|
||||
if (size > REPLICATION_SOURCE_QUOTA * (numOfPeer + 1)) {
|
||||
// We read logs first then check throttler, so if the buffer quota limiter doesn't
|
||||
// take effect, it will push many logs and exceed the quota.
|
||||
testQuotaPass = false;
|
||||
|
@ -181,13 +187,5 @@ public class TestGlobalThrottler {
|
|||
Assert.assertTrue(testQuotaNonZero);
|
||||
}
|
||||
|
||||
private List<Integer> getRowNumbers(List<Cell> cells) {
|
||||
List<Integer> listOfRowNumbers = new ArrayList<>(cells.size());
|
||||
for (Cell c : cells) {
|
||||
listOfRowNumbers.add(Integer.parseInt(Bytes
|
||||
.toString(c.getRowArray(), c.getRowOffset() + ROW.length,
|
||||
c.getRowLength() - ROW.length)));
|
||||
}
|
||||
return listOfRowNumbers;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue