HBASE-25603 Add switch for compaction after bulkload (#2982)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
This commit is contained in:
niuyulin 2021-03-03 20:09:08 -06:00 committed by GitHub
parent 5d9a6ed1fe
commit e80b901e47
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 64 additions and 32 deletions

View File

@ -593,6 +593,13 @@ possible configurations would overwhelm and obscure the important.
to atomic bulk loads are attempted in the face of splitting operations
0 means never give up.</description>
</property>
<property>
<name>hbase.compaction.after.bulkload.enable</name>
<value>true</value>
<description>Request Compaction after bulkload immediately.
If bulkload is continuous, the triggered compactions may increase load,
bring about performance side effect.</description>
</property>
<property>
<name>hbase.master.balancer.maxRitPercent</name>
<value>1.0</value>

View File

@ -244,6 +244,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
public static final String WAL_HSYNC_CONF_KEY = "hbase.wal.hsync";
public static final boolean DEFAULT_WAL_HSYNC = false;
/** Parameter name for compaction after bulkload */
public static final String COMPACTION_AFTER_BULKLOAD_ENABLE =
"hbase.compaction.after.bulkload.enable";
/**
* This is for for using HRegion as a local storage, where we may put the recovered edits in a
* special place. Once this is set, we will only replay the recovered edits under this directory
@ -7025,19 +7029,23 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
isSuccessful = true;
//request compaction
familyWithFinalPath.keySet().forEach(family -> {
HStore store = getStore(family);
try {
if (this.rsServices != null && store.needsCompaction()) {
this.rsServices.getCompactionRequestor().requestCompaction(this, store,
"bulkload hfiles request compaction", Store.PRIORITY_USER + 1,
CompactionLifeCycleTracker.DUMMY, null);
if (conf.getBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, true)) {
// request compaction
familyWithFinalPath.keySet().forEach(family -> {
HStore store = getStore(family);
try {
if (this.rsServices != null && store.needsCompaction()) {
this.rsServices.getCompactionRequestor().requestCompaction(this, store,
"bulkload hfiles request compaction", Store.PRIORITY_USER + 1,
CompactionLifeCycleTracker.DUMMY, null);
LOG.debug("bulkload hfiles request compaction region : {}, family : {}",
this.getRegionInfo(), family);
}
} catch (IOException e) {
LOG.error("bulkload hfiles request compaction error ", e);
}
} catch (IOException e) {
LOG.error("bulkload hfiles request compaction error ", e);
}
});
});
}
} finally {
if (wal != null && !storeFiles.isEmpty()) {
// Write a bulk load event for hfiles that are loaded

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.hbase.regionserver.HRegion.COMPACTION_AFTER_BULKLOAD_ENABLE;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
@ -84,27 +85,32 @@ public class TestCompactionAfterBulkLoad extends TestBulkloadBase {
for (int i = 0; i < 5; i++) {
familyPaths.addAll(withFamilyPathsFor(family1, family2, family3));
}
when(regionServerServices.getConfiguration()).thenReturn(conf);
when(regionServerServices.getCompactionRequestor()).thenReturn(compactionRequester);
when(log.appendMarker(any(), any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD))))
.thenAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) {
WALKeyImpl walKey = invocation.getArgument(1);
MultiVersionConcurrencyControl mvcc = walKey.getMvcc();
if (mvcc != null) {
MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin();
walKey.setWriteEntry(we);
try {
conf.setBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, true);
when(regionServerServices.getConfiguration()).thenReturn(conf);
when(regionServerServices.getCompactionRequestor()).thenReturn(compactionRequester);
when(log.appendMarker(any(), any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD))))
.thenAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) {
WALKeyImpl walKey = invocation.getArgument(1);
MultiVersionConcurrencyControl mvcc = walKey.getMvcc();
if (mvcc != null) {
MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin();
walKey.setWriteEntry(we);
}
return 01L;
}
return 01L;
}
});
});
Mockito.doNothing().when(compactionRequester).requestCompaction(any(), any(), any(), anyInt(),
any(), any());
testRegionWithFamilies(family1, family2, family3).bulkLoadHFiles(familyPaths, false, null);
// invoke three times for 3 families
verify(compactionRequester, times(3)).requestCompaction(isA(HRegion.class), isA(HStore.class),
isA(String.class), anyInt(), eq(CompactionLifeCycleTracker.DUMMY), eq(null));
Mockito.doNothing().when(compactionRequester).requestCompaction(any(), any(), any(), anyInt(),
any(), any());
testRegionWithFamilies(family1, family2, family3).bulkLoadHFiles(familyPaths, false, null);
// invoke three times for 3 families
verify(compactionRequester, times(3)).requestCompaction(isA(HRegion.class), isA(HStore.class),
isA(String.class), anyInt(), eq(CompactionLifeCycleTracker.DUMMY), eq(null));
} finally {
conf.setBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, false);
}
}
}

View File

@ -756,6 +756,17 @@ Maximum retries. This is a maximum number of iterations
`10`
[[hbase.compaction.after.bulkload.enable]]
*`hbase.compaction.after.bulkload.enable`*::
+
.Description
Request Compaction after bulkload immediately.
If bulkload is continuous, the triggered compactions may increase load,
bring about performance side effect.
+
.Default
`true`
[[hbase.balancer.period
]]
*`hbase.balancer.period