HBASE-23254 Release replication buffer quota correctly, when batch includes bulk loaded hfiles (#792)

Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
This commit is contained in:
Jeongdae Kim 2020-01-21 18:23:55 +09:00 committed by Wellington Ramos Chevreuil
parent 2e3273f6ea
commit 8bf4985d69
2 changed files with 66 additions and 2 deletions

View File

@ -616,7 +616,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
try {
WALEntryBatch entryBatch = entryReader.take();
shipEdits(entryBatch);
releaseBufferQuota((int) entryBatch.getHeapSize());
if (!entryBatch.hasMoreEntries()) {
LOG.debug("Finished recovering queue for group "
+ walGroupId + " of peer " + peerClusterZnode);
@ -777,6 +776,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
if (throttler.isEnabled()) {
throttler.addPushSize(sizeExcludeBulkLoad);
}
releaseBufferQuota(sizeExcludeBulkLoad);
totalReplicatedEdits.addAndGet(entries.size());
totalReplicatedOperations.addAndGet(entryBatch.getNbOperations());
// FIXME check relationship between wal group and overall

View File

@ -35,6 +35,7 @@ import static org.mockito.internal.verification.VerificationModeFactory.times;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
@ -54,6 +55,7 @@ import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.Stoppable;
@ -61,6 +63,8 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.Waiter.Predicate;
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
@ -69,7 +73,9 @@ import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.HFileTestUtil;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
@ -263,11 +269,12 @@ public class TestReplicationSource {
private final MetricsSource metrics = mock(MetricsSource.class);
private final ReplicationPeer peer = mock(ReplicationPeer.class);
private final ReplicationEndpoint.Context context = mock(ReplicationEndpoint.Context.class);
private final AtomicLong totalBufferUsed = new AtomicLong();
private Mocks() {
when(peers.getStatusOfPeer(anyString())).thenReturn(true);
when(context.getReplicationPeer()).thenReturn(peer);
when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
when(manager.getTotalBufferUsed()).thenReturn(totalBufferUsed);
}
ReplicationSource createReplicationSourceWithMocks(ReplicationEndpoint endpoint)
@ -278,6 +285,10 @@ public class TestReplicationSource {
"testPeerClusterZnode", UUID.randomUUID(), endpoint, metrics);
return source;
}
public AtomicLong getTotalBufferUsed() {
return totalBufferUsed;
}
}
@Test
@ -327,6 +338,59 @@ public class TestReplicationSource {
assertThat(positionCaptor.getValue(), is(pos));
}
@Test
public void testUpdateQuotaWhenBulkLoad() throws Exception {
byte[] cfBytes = Bytes.toBytes("cf");
TableName tableName = TableName.valueOf("test_table");
Path dir = TEST_UTIL.getDataTestDirOnTestFS(tableName.getNameAsString());
Map<String, Long> storeFilesSize = new HashMap<>(1);
Map<byte[], List<Path>> storeFiles = new HashMap<>(1);
int numRows = 10;
Path familyDir = new Path(dir, Bytes.toString(cfBytes));
Path hfilePath = new Path(familyDir, "test_hfile");
HFileTestUtil.createHFile(conf, FS, hfilePath, cfBytes, cfBytes,
Bytes.toBytes("a"), Bytes.toBytes("z"), numRows);
storeFilesSize.put(hfilePath.getName(), FS.getFileStatus(hfilePath).getLen());
storeFiles.put(cfBytes, Collections.singletonList(hfilePath));
HRegionInfo regionInfo = new HRegionInfo(tableName);
WALProtos.BulkLoadDescriptor loadDescriptor = ProtobufUtil
.toBulkLoadDescriptor(tableName, ByteStringer.wrap(Bytes.toBytes("test_region")),
storeFiles, storeFilesSize, 1, null);
WALEdit edit = WALEdit.createBulkLoadEvent(regionInfo, loadDescriptor);
final ReplicationEndpointForTest endpoint = new ReplicationEndpointForTest() {
@Override
public WALEntryFilter getWALEntryfilter() {
return null;
}
};
final Path log = new Path(logDir, "log.1");
WALProvider.Writer writer = WALFactory.createWALWriter(FS, log, TEST_UTIL.getConfiguration());
WALKey key = new WALKey(regionInfo.getEncodedNameAsBytes(), tableName, 0, 0,
HConstants.DEFAULT_CLUSTER_ID);
WAL.Entry bulkLoadEventEntry = new WAL.Entry(key, edit);
WAL.Entry entryWithoutCells = new WAL.Entry(key, new WALEdit());
writer.append(bulkLoadEventEntry);
writer.append(entryWithoutCells);
writer.close();
Mocks mocks = new Mocks();
final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint);
source.run();
source.enqueueLog(log);
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
@Override public boolean evaluate() throws Exception {
return endpoint.replicateCount.get() > 0;
}
});
assertEquals(0L, mocks.getTotalBufferUsed().get());
}
@Test
public void testSetLogPositionAndRemoveOldWALsEvenIfEmptyWALsRolled() throws Exception {
Mocks mocks = new Mocks();