HBASE-3209 : New Compaction Algorithm
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1033305 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
46ab084208
commit
1eb39d194b
|
@ -94,10 +94,10 @@ public class Store implements HeapSize {
|
|||
private final long minCompactSize;
|
||||
// compactRatio: double on purpose! Float.MAX < Long.MAX < Double.MAX
|
||||
// With float, java will downcast your long to float for comparisons (bad)
|
||||
private double compactRatio;
|
||||
private double compactRatio;
|
||||
private long lastCompactSize = 0;
|
||||
/* how many bytes to write between status checks */
|
||||
static int closeCheckInterval = 0;
|
||||
static int closeCheckInterval = 0;
|
||||
private final long desiredMaxFileSize;
|
||||
private final int blockingStoreFileCount;
|
||||
private volatile long storeSize = 0L;
|
||||
|
@ -156,10 +156,10 @@ public class Store implements HeapSize {
|
|||
this.blockcache = family.isBlockCacheEnabled();
|
||||
this.blocksize = family.getBlocksize();
|
||||
this.compression = family.getCompression();
|
||||
// avoid overriding compression setting for major compactions if the user
|
||||
// avoid overriding compression setting for major compactions if the user
|
||||
// has not specified it separately
|
||||
this.compactionCompression =
|
||||
(family.getCompactionCompression() != Compression.Algorithm.NONE) ?
|
||||
(family.getCompactionCompression() != Compression.Algorithm.NONE) ?
|
||||
family.getCompactionCompression() : this.compression;
|
||||
this.comparator = info.getComparator();
|
||||
// getTimeToLive returns ttl in seconds. Convert to milliseconds.
|
||||
|
@ -634,15 +634,15 @@ public class Store implements HeapSize {
|
|||
|
||||
/* get store file sizes for incremental compacting selection.
|
||||
* normal skew:
|
||||
*
|
||||
*
|
||||
* older ----> newer
|
||||
* _
|
||||
* | | _
|
||||
* | | | | _
|
||||
* --|-|- |-|- |-|---_-------_------- minCompactSize
|
||||
* | | | | | | | | _ | |
|
||||
* | | | | | | | | | | | |
|
||||
* | | | | | | | | | | | |
|
||||
* | | _
|
||||
* | | | | _
|
||||
* --|-|- |-|- |-|---_-------_------- minCompactSize
|
||||
* | | | | | | | | _ | |
|
||||
* | | | | | | | | | | | |
|
||||
* | | | | | | | | | | | |
|
||||
*/
|
||||
int countOfFiles = filesToCompact.size();
|
||||
long [] fileSizes = new long[countOfFiles];
|
||||
|
@ -662,7 +662,7 @@ public class Store implements HeapSize {
|
|||
fileSizes[i] = file.getReader().length();
|
||||
// calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
|
||||
int tooFar = i + this.maxFilesToCompact - 1;
|
||||
sumSize[i] = fileSizes[i]
|
||||
sumSize[i] = fileSizes[i]
|
||||
+ ((i+1 < countOfFiles) ? sumSize[i+1] : 0)
|
||||
- ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
|
||||
}
|
||||
|
@ -672,40 +672,40 @@ public class Store implements HeapSize {
|
|||
// we're doing a minor compaction, let's see what files are applicable
|
||||
int start = 0;
|
||||
double r = this.compactRatio;
|
||||
|
||||
|
||||
/* Start at the oldest file and stop when you find the first file that
|
||||
* meets compaction criteria:
|
||||
* (1) a recently-flushed, small file (i.e. <= minCompactSize)
|
||||
* OR
|
||||
* (2) within the compactRatio of sum(newer_files)
|
||||
* Given normal skew, any newer files will also meet this criteria
|
||||
*
|
||||
*
|
||||
* Additional Note:
|
||||
* If fileSizes.size() >> maxFilesToCompact, we will recurse on
|
||||
* compact(). Consider the oldest files first to avoid a
|
||||
* situation where we always compact [end-threshold,end). Then, the
|
||||
* If fileSizes.size() >> maxFilesToCompact, we will recurse on
|
||||
* compact(). Consider the oldest files first to avoid a
|
||||
* situation where we always compact [end-threshold,end). Then, the
|
||||
* last file becomes an aggregate of the previous compactions.
|
||||
*/
|
||||
while(countOfFiles - start >= this.compactionThreshold &&
|
||||
fileSizes[start] >
|
||||
fileSizes[start] >
|
||||
Math.max(minCompactSize, (long)(sumSize[start+1] * r))) {
|
||||
++start;
|
||||
}
|
||||
int end = Math.min(countOfFiles, start + this.maxFilesToCompact);
|
||||
totalSize = fileSizes[start]
|
||||
totalSize = fileSizes[start]
|
||||
+ ((start+1 < countOfFiles) ? sumSize[start+1] : 0);
|
||||
|
||||
// if we don't have enough files to compact, just wait
|
||||
if (end - start < this.compactionThreshold) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Skipped compaction of " + this.storeNameStr
|
||||
+ ". Only " + (end - start) + " file(s) of size "
|
||||
+ StringUtils.humanReadableInt(totalSize)
|
||||
LOG.debug("Skipped compaction of " + this.storeNameStr
|
||||
+ ". Only " + (end - start) + " file(s) of size "
|
||||
+ StringUtils.humanReadableInt(totalSize)
|
||||
+ " are meet compaction criteria.");
|
||||
}
|
||||
return checkSplit(forceSplit);
|
||||
}
|
||||
|
||||
|
||||
if (0 == start && end == countOfFiles) {
|
||||
// we decided all the files were candidates! major compact
|
||||
majorcompaction = true;
|
||||
|
@ -728,7 +728,7 @@ public class Store implements HeapSize {
|
|||
LOG.info("Started compaction of " + filesToCompact.size() + " file(s) in " +
|
||||
this.storeNameStr + " of " + this.region.getRegionInfo().getRegionNameAsString() +
|
||||
(references? ", hasReferences=true,": " ") + " into " +
|
||||
region.getTmpDir() + ", seqid=" + maxId +
|
||||
region.getTmpDir() + ", seqid=" + maxId +
|
||||
", totalSize=" + StringUtils.humanReadableInt(totalSize));
|
||||
StoreFile.Writer writer = compact(filesToCompact, majorcompaction, maxId);
|
||||
// Move the compaction into place.
|
||||
|
@ -850,7 +850,7 @@ public class Store implements HeapSize {
|
|||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
long getNextMajorCompactTime() {
|
||||
// default = 24hrs
|
||||
long ret = conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24);
|
||||
|
@ -859,7 +859,7 @@ public class Store implements HeapSize {
|
|||
family.getValue(HConstants.MAJOR_COMPACTION_PERIOD);
|
||||
ret = (new Long(strCompactionTime)).longValue();
|
||||
}
|
||||
|
||||
|
||||
if (ret > 0) {
|
||||
// default = 20% = +/- 4.8 hrs
|
||||
double jitterPct = conf.getFloat("hbase.hregion.majorcompaction.jitter",
|
||||
|
@ -896,9 +896,9 @@ public class Store implements HeapSize {
|
|||
? r.getFilterEntries() : r.getEntries();
|
||||
maxKeyCount += keyCount;
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Compacting: " + file +
|
||||
"; keyCount = " + keyCount +
|
||||
"; Bloom Type = " + r.getBloomFilterType().toString() +
|
||||
LOG.debug("Compacting: " + file +
|
||||
"; keyCount = " + keyCount +
|
||||
"; Bloom Type = " + r.getBloomFilterType().toString() +
|
||||
"; Size = " + StringUtils.humanReadableInt(r.length()) );
|
||||
}
|
||||
}
|
||||
|
@ -924,7 +924,7 @@ public class Store implements HeapSize {
|
|||
ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();
|
||||
while (scanner.next(kvs)) {
|
||||
if (writer == null && !kvs.isEmpty()) {
|
||||
writer = createWriterInTmp(maxKeyCount,
|
||||
writer = createWriterInTmp(maxKeyCount,
|
||||
this.compactionCompression);
|
||||
}
|
||||
if (writer != null) {
|
||||
|
@ -941,8 +941,8 @@ public class Store implements HeapSize {
|
|||
writer.close();
|
||||
fs.delete(writer.getPath(), false);
|
||||
throw new InterruptedIOException(
|
||||
"Aborting compaction of store " + this +
|
||||
" in region " + this.region +
|
||||
"Aborting compaction of store " + this +
|
||||
" in region " + this.region +
|
||||
" because user requested stop.");
|
||||
}
|
||||
}
|
||||
|
@ -1313,7 +1313,7 @@ public class Store implements HeapSize {
|
|||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
/** @return aggregate size of all HStores used in the last compaction */
|
||||
public long getLastCompactSize() {
|
||||
return this.lastCompactSize;
|
||||
|
@ -1385,7 +1385,7 @@ public class Store implements HeapSize {
|
|||
}
|
||||
return size;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @return The priority that this store should have in the compaction queue
|
||||
*/
|
||||
|
|
Loading…
Reference in New Issue