HBASE-3209 : New Compaction Algorithm
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1033302 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
07ec5bff7b
commit
fb4b83d41c
|
@ -1128,6 +1128,8 @@ Release 0.90.0 - Unreleased
|
|||
TTL to allow major even if single file
|
||||
HBASE-3194 HBase should run on both secure and vanilla versions of Hadoop 0.20
|
||||
(Gary Helmling via Stack)
|
||||
HBASE-3209 HBASE-3209 : New Compaction Algorithm
|
||||
(Nicolas Spiegelberg via Stack)
|
||||
|
||||
|
||||
NEW FEATURES
|
||||
|
|
|
@ -90,7 +90,11 @@ public class Store implements HeapSize {
|
|||
// ttl in milliseconds.
|
||||
protected long ttl;
|
||||
private long majorCompactionTime;
|
||||
private int maxFilesToCompact;
|
||||
private final int maxFilesToCompact;
|
||||
private final long minCompactSize;
|
||||
// compactRatio: double on purpose! Float.MAX < Long.MAX < Double.MAX
|
||||
// With float, java will downcast your long to float for comparisons (bad)
|
||||
private double compactRatio;
|
||||
private long lastCompactSize = 0;
|
||||
/* how many bytes to write between status checks */
|
||||
static int closeCheckInterval = 0;
|
||||
|
@ -174,8 +178,8 @@ public class Store implements HeapSize {
|
|||
|
||||
// By default, we compact if an HStore has more than
|
||||
// MIN_COMMITS_FOR_COMPACTION map files
|
||||
this.compactionThreshold =
|
||||
conf.getInt("hbase.hstore.compactionThreshold", 3);
|
||||
this.compactionThreshold = Math.max(2,
|
||||
conf.getInt("hbase.hstore.compactionThreshold", 3));
|
||||
|
||||
// Check if this is in-memory store
|
||||
this.inMemory = family.isInMemory();
|
||||
|
@ -193,6 +197,9 @@ public class Store implements HeapSize {
|
|||
this.majorCompactionTime = getNextMajorCompactTime();
|
||||
|
||||
this.maxFilesToCompact = conf.getInt("hbase.hstore.compaction.max", 10);
|
||||
this.minCompactSize = this.region.memstoreFlushSize * 3 / 2; // +50% pad
|
||||
this.compactRatio = conf.getFloat("hbase.hstore.compaction.ratio", 1.2F);
|
||||
|
||||
if (Store.closeCheckInterval == 0) {
|
||||
Store.closeCheckInterval = conf.getInt(
|
||||
"hbase.hstore.close.check.interval", 10*1000*1000 /* 10 MB */);
|
||||
|
@ -595,13 +602,13 @@ public class Store implements HeapSize {
|
|||
* <p>We don't want to hold the structureLock for the whole time, as a compact()
|
||||
* can be lengthy and we want to allow cache-flushes during this period.
|
||||
*
|
||||
* @param mc True to force a major compaction regardless of thresholds
|
||||
* @param forceMajor True to force a major compaction regardless of thresholds
|
||||
* @return row to split around if a split is needed, null otherwise
|
||||
* @throws IOException
|
||||
*/
|
||||
StoreSize compact(final boolean mc) throws IOException {
|
||||
StoreSize compact(final boolean forceMajor) throws IOException {
|
||||
boolean forceSplit = this.region.shouldSplit(false);
|
||||
boolean majorcompaction = mc;
|
||||
boolean majorcompaction = forceMajor;
|
||||
synchronized (compactLock) {
|
||||
this.lastCompactSize = 0;
|
||||
|
||||
|
@ -612,9 +619,6 @@ public class Store implements HeapSize {
|
|||
return null;
|
||||
}
|
||||
|
||||
// Max-sequenceID is the last key of the storefiles TreeMap
|
||||
long maxId = StoreFile.getMaxSequenceIdInList(storefiles);
|
||||
|
||||
// Check to see if we need to do a major compaction on this region.
|
||||
// If so, change doMajorCompaction to true to skip the incremental
|
||||
// compacting below. Only check if doMajorCompaction is not true.
|
||||
|
@ -628,73 +632,104 @@ public class Store implements HeapSize {
|
|||
return checkSplit(forceSplit);
|
||||
}
|
||||
|
||||
// HBASE-745, preparing all store file sizes for incremental compacting
|
||||
// selection.
|
||||
/* get store file sizes for incremental compacting selection.
|
||||
* normal skew:
|
||||
*
|
||||
* older ----> newer
|
||||
* _
|
||||
* | | _
|
||||
* | | | | _
|
||||
* --|-|- |-|- |-|---_-------_------- minCompactSize
|
||||
* | | | | | | | | _ | |
|
||||
* | | | | | | | | | | | |
|
||||
* | | | | | | | | | | | |
|
||||
*/
|
||||
int countOfFiles = filesToCompact.size();
|
||||
long totalSize = 0;
|
||||
long [] fileSizes = new long[countOfFiles];
|
||||
long skipped = 0;
|
||||
int point = 0;
|
||||
for (int i = 0; i < countOfFiles; i++) {
|
||||
long [] sumSize = new long[countOfFiles];
|
||||
for (int i = countOfFiles-1; i >= 0; --i) {
|
||||
StoreFile file = filesToCompact.get(i);
|
||||
Path path = file.getPath();
|
||||
if (path == null) {
|
||||
LOG.warn("Path is null for " + file);
|
||||
LOG.error("Path is null for " + file);
|
||||
return null;
|
||||
}
|
||||
StoreFile.Reader r = file.getReader();
|
||||
if (r == null) {
|
||||
LOG.warn("StoreFile " + file + " has a null Reader");
|
||||
LOG.error("StoreFile " + file + " has a null Reader");
|
||||
return null;
|
||||
}
|
||||
long len = file.getReader().length();
|
||||
fileSizes[i] = len;
|
||||
totalSize += len;
|
||||
fileSizes[i] = file.getReader().length();
|
||||
// calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
|
||||
int tooFar = i + this.maxFilesToCompact - 1;
|
||||
sumSize[i] = fileSizes[i]
|
||||
+ ((i+1 < countOfFiles) ? sumSize[i+1] : 0)
|
||||
- ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
|
||||
}
|
||||
|
||||
long totalSize = 0;
|
||||
if (!majorcompaction && !references) {
|
||||
// Here we select files for incremental compaction.
|
||||
// The rule is: if the largest(oldest) one is more than twice the
|
||||
// size of the second, skip the largest, and continue to next...,
|
||||
// until we meet the compactionThreshold limit.
|
||||
// we're doing a minor compaction, let's see what files are applicable
|
||||
int start = 0;
|
||||
double r = this.compactRatio;
|
||||
|
||||
// A problem with the above heuristic is that we could go through all of
|
||||
// filesToCompact and the above condition could hold for all files and
|
||||
// we'd end up with nothing to compact. To protect against this, we'll
|
||||
// compact the tail -- up to the last 4 files -- of filesToCompact
|
||||
// regardless.
|
||||
int tail = Math.min(countOfFiles, 4);
|
||||
for (point = 0; point < (countOfFiles - tail); point++) {
|
||||
if (((fileSizes[point] < fileSizes[point + 1] * 2) &&
|
||||
(countOfFiles - point) <= maxFilesToCompact)) {
|
||||
break;
|
||||
/* Start at the oldest file and stop when you find the first file that
|
||||
* meets compaction criteria:
|
||||
* (1) a recently-flushed, small file (i.e. <= minCompactSize)
|
||||
* OR
|
||||
* (2) within the compactRatio of sum(newer_files)
|
||||
* Given normal skew, any newer files will also meet this criteria
|
||||
*
|
||||
* Additional Note:
|
||||
* If fileSizes.size() >> maxFilesToCompact, we will recurse on
|
||||
* compact(). Consider the oldest files first to avoid a
|
||||
* situation where we always compact [end-threshold,end). Then, the
|
||||
* last file becomes an aggregate of the previous compactions.
|
||||
*/
|
||||
while(countOfFiles - start >= this.compactionThreshold &&
|
||||
fileSizes[start] >
|
||||
Math.max(minCompactSize, (long)(sumSize[start+1] * r))) {
|
||||
++start;
|
||||
}
|
||||
skipped += fileSizes[point];
|
||||
}
|
||||
filesToCompact = new ArrayList<StoreFile>(filesToCompact.subList(point,
|
||||
countOfFiles));
|
||||
if (filesToCompact.size() <= 1) {
|
||||
int end = Math.min(countOfFiles, start + this.maxFilesToCompact);
|
||||
totalSize = fileSizes[start]
|
||||
+ ((start+1 < countOfFiles) ? sumSize[start+1] : 0);
|
||||
|
||||
// if we don't have enough files to compact, just wait
|
||||
if (end - start < this.compactionThreshold) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Skipped compaction of 1 file; compaction size of " +
|
||||
this.storeNameStr + ": " +
|
||||
StringUtils.humanReadableInt(totalSize) + "; Skipped " + point +
|
||||
" files, size: " + skipped);
|
||||
LOG.debug("Skipped compaction of " + this.storeNameStr
|
||||
+ ". Only " + (end - start) + " file(s) of size "
|
||||
+ StringUtils.humanReadableInt(totalSize)
|
||||
+ " are meet compaction criteria.");
|
||||
}
|
||||
return checkSplit(forceSplit);
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Compaction size of " + this.storeNameStr + ": " +
|
||||
StringUtils.humanReadableInt(totalSize) + "; Skipped " + point +
|
||||
" file(s), size: " + skipped);
|
||||
|
||||
if (0 == start && end == countOfFiles) {
|
||||
// we decided all the files were candidates! major compact
|
||||
majorcompaction = true;
|
||||
} else {
|
||||
filesToCompact = new ArrayList<StoreFile>(filesToCompact.subList(start,
|
||||
end));
|
||||
}
|
||||
} else {
|
||||
// all files included in this compaction
|
||||
for (long i : fileSizes) {
|
||||
totalSize += i;
|
||||
}
|
||||
}
|
||||
this.lastCompactSize = totalSize - skipped;
|
||||
this.lastCompactSize = totalSize;
|
||||
|
||||
// Max-sequenceID is the last key in the files we're compacting
|
||||
long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact);
|
||||
|
||||
// Ready to go. Have list of files to compact.
|
||||
LOG.info("Started compaction of " + filesToCompact.size() + " file(s) in " +
|
||||
this.storeNameStr + " of " + this.region.getRegionInfo().getRegionNameAsString() +
|
||||
(references? ", hasReferences=true,": " ") + " into " +
|
||||
region.getTmpDir() + ", sequenceid=" + maxId);
|
||||
region.getTmpDir() + ", seqid=" + maxId +
|
||||
", totalSize=" + StringUtils.humanReadableInt(totalSize));
|
||||
StoreFile.Writer writer = compact(filesToCompact, majorcompaction, maxId);
|
||||
// Move the compaction into place.
|
||||
StoreFile sf = completeCompaction(filesToCompact, writer);
|
||||
|
@ -702,8 +737,9 @@ public class Store implements HeapSize {
|
|||
LOG.info("Completed" + (majorcompaction? " major ": " ") +
|
||||
"compaction of " + filesToCompact.size() + " file(s) in " +
|
||||
this.storeNameStr + " of " + this.region.getRegionInfo().getRegionNameAsString() +
|
||||
"; new storefile is " + (sf == null? "none": sf.toString()) +
|
||||
"; store size is " + StringUtils.humanReadableInt(storeSize));
|
||||
"; new storefile name=" + (sf == null? "none": sf.toString()) +
|
||||
", size=" + (sf == null? "none": StringUtils.humanReadableInt(sf.getReader().length())) +
|
||||
"; total size for store is " + StringUtils.humanReadableInt(storeSize));
|
||||
}
|
||||
}
|
||||
return checkSplit(forceSplit);
|
||||
|
@ -856,8 +892,15 @@ public class Store implements HeapSize {
|
|||
if (r != null) {
|
||||
// NOTE: getFilterEntries could cause under-sized blooms if the user
|
||||
// switches bloom type (e.g. from ROW to ROWCOL)
|
||||
maxKeyCount += (r.getBloomFilterType() == family.getBloomFilterType())
|
||||
long keyCount = (r.getBloomFilterType() == family.getBloomFilterType())
|
||||
? r.getFilterEntries() : r.getEntries();
|
||||
maxKeyCount += keyCount;
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Compacting: " + file +
|
||||
"; keyCount = " + keyCount +
|
||||
"; Bloom Type = " + r.getBloomFilterType().toString() +
|
||||
"; Size = " + StringUtils.humanReadableInt(r.length()) );
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1486,7 +1529,7 @@ public class Store implements HeapSize {
|
|||
|
||||
public static final long FIXED_OVERHEAD = ClassSize.align(
|
||||
ClassSize.OBJECT + (15 * ClassSize.REFERENCE) +
|
||||
(5 * Bytes.SIZEOF_LONG) +
|
||||
(6 * Bytes.SIZEOF_LONG) + (1 * Bytes.SIZEOF_DOUBLE) +
|
||||
(4 * Bytes.SIZEOF_INT) + (Bytes.SIZEOF_BOOLEAN * 2));
|
||||
|
||||
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
|
||||
|
|
Loading…
Reference in New Issue