diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 5acb70922f8..c586142b64c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -616,7 +616,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf try { WALEntryBatch entryBatch = entryReader.take(); shipEdits(entryBatch); - releaseBufferQuota((int) entryBatch.getHeapSize()); if (!entryBatch.hasMoreEntries()) { LOG.debug("Finished recovering queue for group " + walGroupId + " of peer " + peerClusterZnode); @@ -777,6 +776,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf if (throttler.isEnabled()) { throttler.addPushSize(sizeExcludeBulkLoad); } + releaseBufferQuota(sizeExcludeBulkLoad); totalReplicatedEdits.addAndGet(entries.size()); totalReplicatedOperations.addAndGet(entryBatch.getNbOperations()); // FIXME check relationship between wal group and overall diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java index b4ac71bf5c8..84d2d8b6bf0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java @@ -35,6 +35,7 @@ import static org.mockito.internal.verification.VerificationModeFactory.times; import java.io.IOException; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -54,6 +55,7 @@ import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.Stoppable; @@ -61,6 +63,8 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.Waiter.Predicate; import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; @@ -69,7 +73,9 @@ import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager; import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.HFileTestUtil; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKey; @@ -263,11 +269,12 @@ public class TestReplicationSource { private final MetricsSource metrics = mock(MetricsSource.class); private final ReplicationPeer peer = mock(ReplicationPeer.class); private final ReplicationEndpoint.Context context = mock(ReplicationEndpoint.Context.class); + private final AtomicLong totalBufferUsed = new AtomicLong(); private Mocks() { when(peers.getStatusOfPeer(anyString())).thenReturn(true); when(context.getReplicationPeer()).thenReturn(peer); - when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); + when(manager.getTotalBufferUsed()).thenReturn(totalBufferUsed); } ReplicationSource createReplicationSourceWithMocks(ReplicationEndpoint endpoint) @@ -278,6 +285,10 @@ public class TestReplicationSource { "testPeerClusterZnode", UUID.randomUUID(), endpoint, metrics); return source; } + + public AtomicLong getTotalBufferUsed() { + return totalBufferUsed; + } } @Test @@ -327,6 +338,59 @@ public class TestReplicationSource { assertThat(positionCaptor.getValue(), is(pos)); } + @Test + public void testUpdateQuotaWhenBulkLoad() throws Exception { + byte[] cfBytes = Bytes.toBytes("cf"); + TableName tableName = TableName.valueOf("test_table"); + Path dir = TEST_UTIL.getDataTestDirOnTestFS(tableName.getNameAsString()); + Map storeFilesSize = new HashMap<>(1); + Map> storeFiles = new HashMap<>(1); + int numRows = 10; + + Path familyDir = new Path(dir, Bytes.toString(cfBytes)); + Path hfilePath = new Path(familyDir, "test_hfile"); + HFileTestUtil.createHFile(conf, FS, hfilePath, cfBytes, cfBytes, + Bytes.toBytes("a"), Bytes.toBytes("z"), numRows); + storeFilesSize.put(hfilePath.getName(), FS.getFileStatus(hfilePath).getLen()); + storeFiles.put(cfBytes, Collections.singletonList(hfilePath)); + + HRegionInfo regionInfo = new HRegionInfo(tableName); + WALProtos.BulkLoadDescriptor loadDescriptor = ProtobufUtil + .toBulkLoadDescriptor(tableName, ByteStringer.wrap(Bytes.toBytes("test_region")), + storeFiles, storeFilesSize, 1, null); + WALEdit edit = WALEdit.createBulkLoadEvent(regionInfo, loadDescriptor); + + final ReplicationEndpointForTest endpoint = new ReplicationEndpointForTest() { + @Override + public WALEntryFilter getWALEntryfilter() { + return null; + } + }; + final Path log = new Path(logDir, "log.1"); + + WALProvider.Writer writer = WALFactory.createWALWriter(FS, log, TEST_UTIL.getConfiguration()); + WALKey key = new WALKey(regionInfo.getEncodedNameAsBytes(), tableName, 0, 0, + HConstants.DEFAULT_CLUSTER_ID); + WAL.Entry bulkLoadEventEntry = new WAL.Entry(key, edit); + WAL.Entry entryWithoutCells = new WAL.Entry(key, new WALEdit()); + writer.append(bulkLoadEventEntry); + writer.append(entryWithoutCells); + writer.close(); + + Mocks mocks = new Mocks(); + final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint); + source.run(); + + source.enqueueLog(log); + Waiter.waitFor(conf, 20000, new Waiter.Predicate() { + @Override public boolean evaluate() throws Exception { + return endpoint.replicateCount.get() > 0; + } + }); + + assertEquals(0L, mocks.getTotalBufferUsed().get()); + } + @Test public void testSetLogPositionAndRemoveOldWALsEvenIfEmptyWALsRolled() throws Exception { Mocks mocks = new Mocks();