HBASE-18116 Replication source in-memory accounting should not include bulk transfer hfiles

Signed-off-by: Andrew Purtell <apurtell@apache.org>
Amending-Author: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
Xu Cang 2018-05-31 20:00:04 -07:00 committed by Andrew Purtell
parent f1eda99451
commit a25878cf40
3 changed files with 38 additions and 35 deletions

View File

@ -72,10 +72,6 @@ import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.DefaultWALProvider; import org.apache.hadoop.hbase.wal.DefaultWALProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Entry;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Service;
/** /**
* Class that handles the source of a replication stream. * Class that handles the source of a replication stream.
* Currently does not handle more than 1 slave * Currently does not handle more than 1 slave
@ -628,6 +624,18 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
} }
} }
/**
* get batchEntry size excludes bulk load file sizes.
* Uses ReplicationSourceWALReader's static method.
*/
private int getBatchEntrySizeExcludeBulkLoad(WALEntryBatch entryBatch) {
int totalSize = 0;
for(Entry entry : entryBatch.getWalEntries()) {
totalSize += entryReader.getEntrySizeExcludeBulkLoad(entry);
}
return totalSize;
}
/** /**
* Do the shipping logic * Do the shipping logic
*/ */
@ -646,11 +654,12 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
return; return;
} }
int currentSize = (int) entryBatch.getHeapSize(); int currentSize = (int) entryBatch.getHeapSize();
int sizeExcludeBulkLoad = getBatchEntrySizeExcludeBulkLoad(entryBatch);
while (isWorkerActive()) { while (isWorkerActive()) {
try { try {
checkBandwidthChangeAndResetThrottler(); checkBandwidthChangeAndResetThrottler();
if (throttler.isEnabled()) { if (throttler.isEnabled()) {
long sleepTicks = throttler.getNextSleepInterval(currentSize); long sleepTicks = throttler.getNextSleepInterval(sizeExcludeBulkLoad);
if (sleepTicks > 0) { if (sleepTicks > 0) {
try { try {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
@ -696,7 +705,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
updateLogPosition(lastReadPosition); updateLogPosition(lastReadPosition);
} }
if (throttler.isEnabled()) { if (throttler.isEnabled()) {
throttler.addPushSize(currentSize); throttler.addPushSize(sizeExcludeBulkLoad);
} }
totalReplicatedEdits.addAndGet(entries.size()); totalReplicatedEdits.addAndGet(entries.size());
totalReplicatedOperations.addAndGet(entryBatch.getNbOperations()); totalReplicatedOperations.addAndGet(entryBatch.getNbOperations());

View File

@ -39,17 +39,16 @@ import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream.WALEntryStreamRuntimeException; import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream.WALEntryStreamRuntimeException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALKey;
/** /**
* Reads and filters WAL entries, groups the filtered entries into batches, and puts the batches onto a queue * Reads and filters WAL entries, groups the filtered entries into batches, and puts the batches onto a queue
@ -145,10 +144,11 @@ public class ReplicationSourceWALReaderThread extends Thread {
if (entry != null) { if (entry != null) {
WALEdit edit = entry.getEdit(); WALEdit edit = entry.getEdit();
if (edit != null && !edit.isEmpty()) { if (edit != null && !edit.isEmpty()) {
long entrySize = getEntrySize(entry); long entrySize = getEntrySizeIncludeBulkLoad(entry);
long entrySizeExlucdeBulkLoad = getEntrySizeExcludeBulkLoad(entry);
batch.addEntry(entry); batch.addEntry(entry);
updateBatchStats(batch, entry, entryStream.getPosition(), entrySize); updateBatchStats(batch, entry, entryStream.getPosition(), entrySize);
boolean totalBufferTooLarge = acquireBufferQuota(entrySize); boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExlucdeBulkLoad);
// Stop if too many entries or too big // Stop if too many entries or too big
if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity
|| batch.getNbEntries() >= replicationBatchCountCapacity) { || batch.getNbEntries() >= replicationBatchCountCapacity) {
@ -251,9 +251,17 @@ public class ReplicationSourceWALReaderThread extends Thread {
return entryBatchQueue.take(); return entryBatchQueue.take();
} }
private long getEntrySize(Entry entry) { public long getEntrySizeIncludeBulkLoad(Entry entry) {
WALEdit edit = entry.getEdit(); WALEdit edit = entry.getEdit();
return edit.heapSize() + calculateTotalSizeOfStoreFiles(edit); WALKey key = entry.getKey();
return edit.heapSize() + sizeOfStoreFilesIncludeBulkLoad(edit) +
key.estimatedSerializedSizeOf();
}
public long getEntrySizeExcludeBulkLoad(Entry entry) {
WALEdit edit = entry.getEdit();
WALKey key = entry.getKey();
return edit.heapSize() + key.estimatedSerializedSizeOf();
} }
private void updateBatchStats(WALEntryBatch batch, Entry entry, long entryPosition, long entrySize) { private void updateBatchStats(WALEntryBatch batch, Entry entry, long entryPosition, long entrySize) {
@ -311,7 +319,7 @@ public class ReplicationSourceWALReaderThread extends Thread {
* @param edit edit to count row keys from * @param edit edit to count row keys from
* @return the total size of the store files * @return the total size of the store files
*/ */
private int calculateTotalSizeOfStoreFiles(WALEdit edit) { private int sizeOfStoreFilesIncludeBulkLoad(WALEdit edit) {
List<Cell> cells = edit.getCells(); List<Cell> cells = edit.getCells();
int totalStoreFilesSize = 0; int totalStoreFilesSize = 0;

View File

@ -20,14 +20,11 @@
package org.apache.hadoop.hbase.replication.regionserver; package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor;
@ -53,13 +50,13 @@ import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.junit.Ignore;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
@Category({ ReplicationTests.class, LargeTests.class }) @Category({ ReplicationTests.class, LargeTests.class })
@Ignore("See HBASE-20496")
public class TestGlobalThrottler { public class TestGlobalThrottler {
private static final Log LOG = LogFactory.getLog(TestGlobalThrottler.class); private static final Log LOG = LogFactory.getLog(TestGlobalThrottler.class);
private static final int REPLICATION_SOURCE_QUOTA = 200;
private static int numOfPeer = 0;
private static Configuration conf1; private static Configuration conf1;
private static Configuration conf2; private static Configuration conf2;
@ -98,6 +95,7 @@ public class TestGlobalThrottler {
admin1.addPeer("peer1", rpc, null); admin1.addPeer("peer1", rpc, null);
admin1.addPeer("peer2", rpc, null); admin1.addPeer("peer2", rpc, null);
admin1.addPeer("peer3", rpc, null); admin1.addPeer("peer3", rpc, null);
numOfPeer = admin1.getPeersCount();
utility1.startMiniCluster(1, 1); utility1.startMiniCluster(1, 1);
utility2.startMiniCluster(1, 1); utility2.startMiniCluster(1, 1);
@ -109,9 +107,8 @@ public class TestGlobalThrottler {
utility1.shutdownMiniCluster(); utility1.shutdownMiniCluster();
} }
volatile private boolean testQuotaPass = false; volatile private boolean testQuotaPass = false;
volatile private boolean testQuotaNonZero = false;
@Test @Test
public void testQuota() throws IOException { public void testQuota() throws IOException {
TableName tableName = TableName.valueOf("testQuota"); TableName tableName = TableName.valueOf("testQuota");
@ -131,10 +128,10 @@ public class TestGlobalThrottler {
testQuotaPass = true; testQuotaPass = true;
while (!Thread.interrupted()) { while (!Thread.interrupted()) {
long size = bufferUsed.get(); long size = bufferUsed.get();
if (size > 0) { //the reason here doing "numOfPeer + 1" is because by using method addEntryToBatch(), even the
testQuotaNonZero = true; // batch size (after added last entry) exceeds quota, it still keeps the last one in the batch
} // so total used buffer size can be one "replication.total.buffer.quota" larger than expected
if (size > 600) { if (size > REPLICATION_SOURCE_QUOTA * (numOfPeer + 1)) {
// We read logs first then check throttler, so if the buffer quota limiter doesn't // We read logs first then check throttler, so if the buffer quota limiter doesn't
// take effect, it will push many logs and exceed the quota. // take effect, it will push many logs and exceed the quota.
testQuotaPass = false; testQuotaPass = false;
@ -174,16 +171,5 @@ public class TestGlobalThrottler {
watcher.interrupt(); watcher.interrupt();
Assert.assertTrue(testQuotaPass); Assert.assertTrue(testQuotaPass);
Assert.assertTrue(testQuotaNonZero);
}
private List<Integer> getRowNumbers(List<Cell> cells) {
List<Integer> listOfRowNumbers = new ArrayList<>();
for (Cell c : cells) {
listOfRowNumbers.add(Integer.parseInt(Bytes
.toString(c.getRowArray(), c.getRowOffset() + ROW.length,
c.getRowLength() - ROW.length)));
}
return listOfRowNumbers;
} }
} }