HBASE-26229 Limit count and size of L0 files compaction in StripeCompactionPolicy (#3646)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
922d0666a3
commit
8d2b995352
|
@ -22,7 +22,6 @@ package org.apache.hadoop.hbase.regionserver.compactions;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.regionserver.HStoreFile;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
|
||||
|
@ -120,10 +119,29 @@ public class ExploringCompactionPolicy extends RatioBasedCompactionPolicy {
|
|||
}
|
||||
LOG.debug("Exploring compaction algorithm has selected {} files of size {} starting at " +
|
||||
"candidate #{} after considering {} permutations with {} in ratio", bestSelection.size(),
|
||||
bestSize, bestSize, opts, optsInRatio);
|
||||
bestSize, bestStart, opts, optsInRatio);
|
||||
return new ArrayList<>(bestSelection);
|
||||
}
|
||||
|
||||
/**
|
||||
* Select at least one file in the candidates list to compact, through choosing files
|
||||
* from the head to the index that the accumulation length larger the max compaction size.
|
||||
* This method is a supplementary of the selectSimpleCompaction() method, aims to make sure
|
||||
* at least one file can be selected to compact, for compactions like L0 files, which need to
|
||||
* compact all files and as soon as possible.
|
||||
*/
|
||||
public List<HStoreFile> selectCompactFiles(final List<HStoreFile> candidates, int maxFiles,
|
||||
boolean isOffpeak) {
|
||||
long selectedSize = 0L;
|
||||
for (int end = 0; end < Math.min(candidates.size(), maxFiles); end++) {
|
||||
selectedSize += candidates.get(end).getReader().length();
|
||||
if (selectedSize >= comConf.getMaxCompactSize(isOffpeak)) {
|
||||
return candidates.subList(0, end + 1);
|
||||
}
|
||||
}
|
||||
return candidates;
|
||||
}
|
||||
|
||||
private boolean isBetterSelection(List<HStoreFile> bestSelection, long bestSize,
|
||||
List<HStoreFile> selection, long size, boolean mightBeStuck) {
|
||||
if (mightBeStuck && bestSize > 0 && size > 0) {
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
package org.apache.hadoop.hbase.regionserver.compactions;
|
||||
|
||||
import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
|
@ -130,10 +129,12 @@ public class StripeCompactionPolicy extends CompactionPolicy {
|
|||
List<HStoreFile> l0Files = si.getLevel0Files();
|
||||
|
||||
// See if we need to make new stripes.
|
||||
boolean shouldCompactL0 = (this.config.getLevel0MinFiles() <= l0Files.size());
|
||||
boolean shouldCompactL0 = this.config.getLevel0MinFiles() <= l0Files.size();
|
||||
if (stripeCount == 0) {
|
||||
if (!shouldCompactL0) return null; // nothing to do.
|
||||
return selectNewStripesCompaction(si);
|
||||
if (!shouldCompactL0) {
|
||||
return null; // nothing to do.
|
||||
}
|
||||
return selectL0OnlyCompaction(si);
|
||||
}
|
||||
|
||||
boolean canDropDeletesNoL0 = l0Files.isEmpty();
|
||||
|
@ -141,16 +142,20 @@ public class StripeCompactionPolicy extends CompactionPolicy {
|
|||
if (!canDropDeletesNoL0) {
|
||||
// If we need to compact L0, see if we can add something to it, and drop deletes.
|
||||
StripeCompactionRequest result = selectSingleStripeCompaction(
|
||||
si, true, canDropDeletesNoL0, isOffpeak);
|
||||
if (result != null) return result;
|
||||
si, !shouldSelectL0Files(si), canDropDeletesNoL0, isOffpeak);
|
||||
if (result != null) {
|
||||
return result;
|
||||
}
|
||||
}
|
||||
LOG.debug("Selecting L0 compaction with " + l0Files.size() + " files");
|
||||
return new BoundaryStripeCompactionRequest(l0Files, si.getStripeBoundaries());
|
||||
return selectL0OnlyCompaction(si);
|
||||
}
|
||||
|
||||
// Try to delete fully expired stripes
|
||||
StripeCompactionRequest result = selectExpiredMergeCompaction(si, canDropDeletesNoL0);
|
||||
if (result != null) return result;
|
||||
if (result != null) {
|
||||
return result;
|
||||
}
|
||||
|
||||
// Ok, nothing special here, let's see if we need to do a common compaction.
|
||||
// This will also split the stripes that are too big if needed.
|
||||
|
@ -200,7 +205,7 @@ public class StripeCompactionPolicy extends CompactionPolicy {
|
|||
// If we want to compact L0 to drop deletes, we only want whole-stripe compactions.
|
||||
// So, pass includeL0 as 2nd parameter to indicate that.
|
||||
List<HStoreFile> selection = selectSimpleCompaction(stripes.get(i),
|
||||
!canDropDeletesWithoutL0 && includeL0, isOffpeak);
|
||||
!canDropDeletesWithoutL0 && includeL0, isOffpeak, false);
|
||||
if (selection.isEmpty()) continue;
|
||||
long size = 0;
|
||||
for (HStoreFile sf : selection) {
|
||||
|
@ -268,21 +273,46 @@ public class StripeCompactionPolicy extends CompactionPolicy {
|
|||
* @return The resulting selection.
|
||||
*/
|
||||
private List<HStoreFile> selectSimpleCompaction(
|
||||
List<HStoreFile> sfs, boolean allFilesOnly, boolean isOffpeak) {
|
||||
List<HStoreFile> sfs, boolean allFilesOnly, boolean isOffpeak, boolean forceCompact) {
|
||||
int minFilesLocal = Math.max(
|
||||
allFilesOnly ? sfs.size() : 0, this.config.getStripeCompactMinFiles());
|
||||
int maxFilesLocal = Math.max(this.config.getStripeCompactMaxFiles(), minFilesLocal);
|
||||
return stripePolicy.applyCompactionPolicy(sfs, false, isOffpeak, minFilesLocal, maxFilesLocal);
|
||||
List<HStoreFile> selected = stripePolicy.applyCompactionPolicy(sfs, false,
|
||||
isOffpeak, minFilesLocal, maxFilesLocal);
|
||||
if (forceCompact && (selected == null || selected.isEmpty()) && !sfs.isEmpty()) {
|
||||
return stripePolicy.selectCompactFiles(sfs, maxFilesLocal, isOffpeak);
|
||||
}
|
||||
return selected;
|
||||
}
|
||||
|
||||
private StripeCompactionRequest selectNewStripesCompaction(StripeInformationProvider si) {
|
||||
private boolean shouldSelectL0Files(StripeInformationProvider si) {
|
||||
return si.getLevel0Files().size() > this.config.getStripeCompactMaxFiles() ||
|
||||
getTotalFileSize(si.getLevel0Files()) > comConf.getMaxCompactSize();
|
||||
}
|
||||
|
||||
private StripeCompactionRequest selectL0OnlyCompaction(StripeInformationProvider si) {
|
||||
List<HStoreFile> l0Files = si.getLevel0Files();
|
||||
Pair<Long, Integer> kvsAndCount = estimateTargetKvs(l0Files, config.getInitialCount());
|
||||
LOG.debug("Creating " + kvsAndCount.getSecond() + " initial stripes with "
|
||||
+ kvsAndCount.getFirst() + " kvs each via L0 compaction of " + l0Files.size() + " files");
|
||||
SplitStripeCompactionRequest request = new SplitStripeCompactionRequest(
|
||||
si.getLevel0Files(), OPEN_KEY, OPEN_KEY, kvsAndCount.getSecond(), kvsAndCount.getFirst());
|
||||
request.setMajorRangeFull(); // L0 only, can drop deletes.
|
||||
List<HStoreFile> selectedFiles = l0Files;
|
||||
if (shouldSelectL0Files(si)) {
|
||||
selectedFiles = selectSimpleCompaction(l0Files, false, false, true);
|
||||
assert !selectedFiles.isEmpty() : "Selected L0 files should not be empty";
|
||||
}
|
||||
StripeCompactionRequest request;
|
||||
if (si.getStripeCount() == 0) {
|
||||
Pair<Long, Integer> estimate = estimateTargetKvs(selectedFiles, config.getInitialCount());
|
||||
long targetKvs = estimate.getFirst();
|
||||
int targetCount = estimate.getSecond();
|
||||
request =
|
||||
new SplitStripeCompactionRequest(selectedFiles, OPEN_KEY, OPEN_KEY, targetCount, targetKvs);
|
||||
if (selectedFiles.size() == l0Files.size()) {
|
||||
((SplitStripeCompactionRequest) request).setMajorRangeFull(); // L0 only, can drop deletes.
|
||||
}
|
||||
LOG.debug("Creating {} initial stripes with {} kvs each via L0 compaction of {}/{} files",
|
||||
targetCount, targetKvs, selectedFiles.size(), l0Files.size());
|
||||
} else {
|
||||
request = new BoundaryStripeCompactionRequest(selectedFiles, si.getStripeBoundaries());
|
||||
LOG.debug("Boundary L0 compaction of {}/{} files", selectedFiles.size(), l0Files.size());
|
||||
}
|
||||
return request;
|
||||
}
|
||||
|
||||
|
|
|
@ -17,9 +17,12 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver.compactions;
|
||||
|
||||
import static org.apache.hadoop.hbase.regionserver.StripeStoreConfig.MAX_FILES_KEY;
|
||||
import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
|
||||
import static org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_SIZE_KEY;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.AdditionalMatchers.aryEq;
|
||||
|
@ -36,7 +39,6 @@ import static org.mockito.Mockito.only;
|
|||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
|
@ -88,7 +90,6 @@ import org.junit.runners.Parameterized;
|
|||
import org.junit.runners.Parameterized.Parameter;
|
||||
import org.junit.runners.Parameterized.Parameters;
|
||||
import org.mockito.ArgumentMatcher;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||
|
||||
|
@ -268,6 +269,41 @@ public class TestStripeCompactionPolicy {
|
|||
verifyCompaction(policy, si, si.getStorefiles(), true, 6, 5L, OPEN_KEY, OPEN_KEY, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSelectL0Compaction() throws Exception {
|
||||
//test select ALL L0 files when L0 files count > MIN_FILES_L0_KEY
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 4);
|
||||
StripeCompactionPolicy policy = createPolicy(conf);
|
||||
StripeCompactionPolicy.StripeInformationProvider si = createStripesWithSizes(10, 10L,
|
||||
new Long[] { 5L, 1L, 1L }, new Long[] { 3L, 2L, 2L }, new Long[] { 3L, 2L, 2L });
|
||||
StripeCompactionPolicy.StripeCompactionRequest cr = policy.selectCompaction(si, al(), false);
|
||||
assertNotNull(cr);
|
||||
assertEquals(10, cr.getRequest().getFiles().size());
|
||||
verifyCollectionsEqual(si.getLevel0Files(), cr.getRequest().getFiles());
|
||||
|
||||
// test select partial L0 files when size of L0 files > HBASE_HSTORE_COMPACTION_MAX_SIZE_KEY
|
||||
conf.setLong(HBASE_HSTORE_COMPACTION_MAX_SIZE_KEY, 100L);
|
||||
policy = createPolicy(conf);
|
||||
si = createStripesWithSizes(5, 50L,
|
||||
new Long[] { 5L, 1L, 1L }, new Long[] { 3L, 2L, 2L }, new Long[] { 3L, 2L, 2L });
|
||||
cr = policy.selectCompaction(si, al(), false);
|
||||
assertNotNull(cr);
|
||||
assertEquals(2, cr.getRequest().getFiles().size());
|
||||
verifyCollectionsEqual(si.getLevel0Files().subList(0, 2), cr.getRequest().getFiles());
|
||||
|
||||
// test select partial L0 files when count of L0 files > MAX_FILES_KEY
|
||||
conf.setInt(MAX_FILES_KEY, 6);
|
||||
conf.setLong(HBASE_HSTORE_COMPACTION_MAX_SIZE_KEY, 1000L);
|
||||
policy = createPolicy(conf);
|
||||
si = createStripesWithSizes(10, 10L,
|
||||
new Long[] { 5L, 1L, 1L }, new Long[] { 3L, 2L, 2L }, new Long[] { 3L, 2L, 2L });
|
||||
cr = policy.selectCompaction(si, al(), false);
|
||||
assertNotNull(cr);
|
||||
assertEquals(6, cr.getRequest().getFiles().size());
|
||||
verifyCollectionsEqual(si.getLevel0Files().subList(0, 6), cr.getRequest().getFiles());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExistingStripesFromL0() throws Exception {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
|
|
Loading…
Reference in New Issue