HBASE-18116 Replication source in-memory accounting should not include bulk transfer hfiles
Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
parent
53d29d53c4
commit
d3e2248f12
|
@ -598,6 +598,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
//offsets totalBufferUsed by deducting shipped batchSize.
|
||||||
public void postShipEdits(List<Entry> entries, int batchSize) {
|
public void postShipEdits(List<Entry> entries, int batchSize) {
|
||||||
if (throttler.isEnabled()) {
|
if (throttler.isEnabled()) {
|
||||||
throttler.addPushSize(batchSize);
|
throttler.addPushSize(batchSize);
|
||||||
|
|
|
@ -123,6 +123,18 @@ public class ReplicationSourceShipper extends Thread {
|
||||||
protected void postFinish() {
|
protected void postFinish() {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* get batchEntry size excludes bulk load file sizes.
|
||||||
|
* Uses ReplicationSourceWALReader's static method.
|
||||||
|
*/
|
||||||
|
private int getBatchEntrySizeExcludeBulkLoad(WALEntryBatch entryBatch) {
|
||||||
|
int totalSize = 0;
|
||||||
|
for(Entry entry : entryBatch.getWalEntries()) {
|
||||||
|
totalSize += entryReader.getEntrySizeExcludeBulkLoad(entry);
|
||||||
|
}
|
||||||
|
return totalSize;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Do the shipping logic
|
* Do the shipping logic
|
||||||
*/
|
*/
|
||||||
|
@ -139,6 +151,7 @@ public class ReplicationSourceShipper extends Thread {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
int currentSize = (int) entryBatch.getHeapSize();
|
int currentSize = (int) entryBatch.getHeapSize();
|
||||||
|
int sizeExcludeBulkLoad = getBatchEntrySizeExcludeBulkLoad(entryBatch);
|
||||||
while (isActive()) {
|
while (isActive()) {
|
||||||
try {
|
try {
|
||||||
try {
|
try {
|
||||||
|
@ -175,7 +188,11 @@ public class ReplicationSourceShipper extends Thread {
|
||||||
// Log and clean up WAL logs
|
// Log and clean up WAL logs
|
||||||
updateLogPosition(entryBatch);
|
updateLogPosition(entryBatch);
|
||||||
|
|
||||||
source.postShipEdits(entries, currentSize);
|
//offsets totalBufferUsed by deducting shipped batchSize (excludes bulk load size)
|
||||||
|
//this sizeExcludeBulkLoad has to use same calculation that when calling
|
||||||
|
//acquireBufferQuota() in ReplicatinoSourceWALReader because they maintain
|
||||||
|
//same variable: totalBufferUsed
|
||||||
|
source.postShipEdits(entries, sizeExcludeBulkLoad);
|
||||||
// FIXME check relationship between wal group and overall
|
// FIXME check relationship between wal group and overall
|
||||||
source.getSourceMetrics().shipBatch(entryBatch.getNbOperations(), currentSize,
|
source.getSourceMetrics().shipBatch(entryBatch.getNbOperations(), currentSize,
|
||||||
entryBatch.getNbHFiles());
|
entryBatch.getNbHFiles());
|
||||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.util.Pair;
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||||
import org.apache.hadoop.hbase.wal.WALEdit;
|
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||||
|
import org.apache.hadoop.hbase.wal.WALKey;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.apache.yetus.audience.InterfaceStability;
|
import org.apache.yetus.audience.InterfaceStability;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -168,10 +169,12 @@ class ReplicationSourceWALReader extends Thread {
|
||||||
if (edit == null || edit.isEmpty()) {
|
if (edit == null || edit.isEmpty()) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
long entrySize = getEntrySize(entry);
|
long entrySize = getEntrySizeIncludeBulkLoad(entry);
|
||||||
|
long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry);
|
||||||
batch.addEntry(entry);
|
batch.addEntry(entry);
|
||||||
updateBatchStats(batch, entry, entrySize);
|
updateBatchStats(batch, entry, entrySize);
|
||||||
boolean totalBufferTooLarge = acquireBufferQuota(entrySize);
|
boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExcludeBulkLoad);
|
||||||
|
|
||||||
// Stop if too many entries or too big
|
// Stop if too many entries or too big
|
||||||
return totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity ||
|
return totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity ||
|
||||||
batch.getNbEntries() >= replicationBatchCountCapacity;
|
batch.getNbEntries() >= replicationBatchCountCapacity;
|
||||||
|
@ -296,11 +299,20 @@ class ReplicationSourceWALReader extends Thread {
|
||||||
return entryBatchQueue.take();
|
return entryBatchQueue.take();
|
||||||
}
|
}
|
||||||
|
|
||||||
private long getEntrySize(Entry entry) {
|
private long getEntrySizeIncludeBulkLoad(Entry entry) {
|
||||||
WALEdit edit = entry.getEdit();
|
WALEdit edit = entry.getEdit();
|
||||||
return edit.heapSize() + calculateTotalSizeOfStoreFiles(edit);
|
WALKey key = entry.getKey();
|
||||||
|
return edit.heapSize() + sizeOfStoreFilesIncludeBulkLoad(edit) +
|
||||||
|
key.estimatedSerializedSizeOf();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static long getEntrySizeExcludeBulkLoad(Entry entry) {
|
||||||
|
WALEdit edit = entry.getEdit();
|
||||||
|
WALKey key = entry.getKey();
|
||||||
|
return edit.heapSize() + key.estimatedSerializedSizeOf();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
private void updateBatchStats(WALEntryBatch batch, Entry entry, long entrySize) {
|
private void updateBatchStats(WALEntryBatch batch, Entry entry, long entrySize) {
|
||||||
WALEdit edit = entry.getEdit();
|
WALEdit edit = entry.getEdit();
|
||||||
batch.incrementHeapSize(entrySize);
|
batch.incrementHeapSize(entrySize);
|
||||||
|
@ -353,7 +365,7 @@ class ReplicationSourceWALReader extends Thread {
|
||||||
* @param edit edit to count row keys from
|
* @param edit edit to count row keys from
|
||||||
* @return the total size of the store files
|
* @return the total size of the store files
|
||||||
*/
|
*/
|
||||||
private int calculateTotalSizeOfStoreFiles(WALEdit edit) {
|
private int sizeOfStoreFilesIncludeBulkLoad(WALEdit edit) {
|
||||||
List<Cell> cells = edit.getCells();
|
List<Cell> cells = edit.getCells();
|
||||||
int totalStoreFilesSize = 0;
|
int totalStoreFilesSize = 0;
|
||||||
|
|
||||||
|
|
|
@ -64,6 +64,8 @@ public class TestGlobalThrottler {
|
||||||
HBaseClassTestRule.forClass(TestGlobalThrottler.class);
|
HBaseClassTestRule.forClass(TestGlobalThrottler.class);
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(TestGlobalThrottler.class);
|
private static final Logger LOG = LoggerFactory.getLogger(TestGlobalThrottler.class);
|
||||||
|
private static final int REPLICATION_SOURCE_QUOTA = 200;
|
||||||
|
private static int numOfPeer = 0;
|
||||||
private static Configuration conf1;
|
private static Configuration conf1;
|
||||||
private static Configuration conf2;
|
private static Configuration conf2;
|
||||||
|
|
||||||
|
@ -84,7 +86,7 @@ public class TestGlobalThrottler {
|
||||||
conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
|
conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
|
||||||
conf1.setLong("replication.source.sleepforretries", 100);
|
conf1.setLong("replication.source.sleepforretries", 100);
|
||||||
// Each WAL is about 120 bytes
|
// Each WAL is about 120 bytes
|
||||||
conf1.setInt(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, 200);
|
conf1.setInt(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, REPLICATION_SOURCE_QUOTA);
|
||||||
conf1.setLong("replication.source.per.peer.node.bandwidth", 100L);
|
conf1.setLong("replication.source.per.peer.node.bandwidth", 100L);
|
||||||
|
|
||||||
utility1 = new HBaseTestingUtility(conf1);
|
utility1 = new HBaseTestingUtility(conf1);
|
||||||
|
@ -109,6 +111,7 @@ public class TestGlobalThrottler {
|
||||||
admin1.addPeer("peer1", rpc, null);
|
admin1.addPeer("peer1", rpc, null);
|
||||||
admin1.addPeer("peer2", rpc, null);
|
admin1.addPeer("peer2", rpc, null);
|
||||||
admin1.addPeer("peer3", rpc, null);
|
admin1.addPeer("peer3", rpc, null);
|
||||||
|
numOfPeer = admin1.getPeersCount();
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
|
@ -140,7 +143,10 @@ public class TestGlobalThrottler {
|
||||||
if (size > 0) {
|
if (size > 0) {
|
||||||
testQuotaNonZero = true;
|
testQuotaNonZero = true;
|
||||||
}
|
}
|
||||||
if (size > 600) {
|
//the reason here doing "numOfPeer + 1" is because by using method addEntryToBatch(), even the
|
||||||
|
// batch size (after added last entry) exceeds quota, it still keeps the last one in the batch
|
||||||
|
// so total used buffer size can be one "replication.total.buffer.quota" larger than expected
|
||||||
|
if (size > REPLICATION_SOURCE_QUOTA * (numOfPeer + 1)) {
|
||||||
// We read logs first then check throttler, so if the buffer quota limiter doesn't
|
// We read logs first then check throttler, so if the buffer quota limiter doesn't
|
||||||
// take effect, it will push many logs and exceed the quota.
|
// take effect, it will push many logs and exceed the quota.
|
||||||
testQuotaPass = false;
|
testQuotaPass = false;
|
||||||
|
@ -181,13 +187,5 @@ public class TestGlobalThrottler {
|
||||||
Assert.assertTrue(testQuotaNonZero);
|
Assert.assertTrue(testQuotaNonZero);
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<Integer> getRowNumbers(List<Cell> cells) {
|
|
||||||
List<Integer> listOfRowNumbers = new ArrayList<>(cells.size());
|
|
||||||
for (Cell c : cells) {
|
|
||||||
listOfRowNumbers.add(Integer.parseInt(Bytes
|
|
||||||
.toString(c.getRowArray(), c.getRowOffset() + ROW.length,
|
|
||||||
c.getRowLength() - ROW.length)));
|
|
||||||
}
|
|
||||||
return listOfRowNumbers;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue