HBASE-3290 Max Compaction Size
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1041278 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
953068adbb
commit
fc59f7d77c
|
@ -15,6 +15,7 @@ Release 0.91.0 - Unreleased
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
HBASE-2001 Coprocessors: Colocate user code with regions (Mingjie Lai via
|
HBASE-2001 Coprocessors: Colocate user code with regions (Mingjie Lai via
|
||||||
Andrew Purtell)
|
Andrew Purtell)
|
||||||
|
HBASE-3290 Max Compaction Size (Nicolas Spiegelberg via Stack)
|
||||||
|
|
||||||
NEW FEATURES
|
NEW FEATURES
|
||||||
HBASE-3287 Add option to cache blocks on hfile write and evict blocks on
|
HBASE-3287 Add option to cache blocks on hfile write and evict blocks on
|
||||||
|
|
|
@ -53,6 +53,8 @@ import org.apache.hadoop.hbase.util.ClassSize;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
|
||||||
|
import com.google.common.base.Predicate;
|
||||||
|
import com.google.common.collect.Collections2;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
import com.google.common.collect.Iterables;
|
import com.google.common.collect.Iterables;
|
||||||
|
|
||||||
|
@ -91,8 +93,10 @@ public class Store implements HeapSize {
|
||||||
// ttl in milliseconds.
|
// ttl in milliseconds.
|
||||||
protected long ttl;
|
protected long ttl;
|
||||||
private long majorCompactionTime;
|
private long majorCompactionTime;
|
||||||
|
private final int minFilesToCompact;
|
||||||
private final int maxFilesToCompact;
|
private final int maxFilesToCompact;
|
||||||
private final long minCompactSize;
|
private final long minCompactSize;
|
||||||
|
private final long maxCompactSize;
|
||||||
// compactRatio: double on purpose! Float.MAX < Long.MAX < Double.MAX
|
// compactRatio: double on purpose! Float.MAX < Long.MAX < Double.MAX
|
||||||
// With float, java will downcast your long to float for comparisons (bad)
|
// With float, java will downcast your long to float for comparisons (bad)
|
||||||
private double compactRatio;
|
private double compactRatio;
|
||||||
|
@ -119,7 +123,6 @@ public class Store implements HeapSize {
|
||||||
new CopyOnWriteArraySet<ChangedReadersObserver>();
|
new CopyOnWriteArraySet<ChangedReadersObserver>();
|
||||||
|
|
||||||
private final Object compactLock = new Object();
|
private final Object compactLock = new Object();
|
||||||
private final int compactionThreshold;
|
|
||||||
private final int blocksize;
|
private final int blocksize;
|
||||||
private final boolean blockcache;
|
private final boolean blockcache;
|
||||||
/** Compression algorithm for flush files and minor compaction */
|
/** Compression algorithm for flush files and minor compaction */
|
||||||
|
@ -177,10 +180,10 @@ public class Store implements HeapSize {
|
||||||
this.memstore = new MemStore(this.comparator);
|
this.memstore = new MemStore(this.comparator);
|
||||||
this.storeNameStr = Bytes.toString(this.family.getName());
|
this.storeNameStr = Bytes.toString(this.family.getName());
|
||||||
|
|
||||||
// By default, we compact if an HStore has more than
|
// By default, compact if storefile.count >= minFilesToCompact
|
||||||
// MIN_COMMITS_FOR_COMPACTION map files
|
this.minFilesToCompact = Math.max(2,
|
||||||
this.compactionThreshold = Math.max(2,
|
conf.getInt("hbase.hstore.compaction.min",
|
||||||
conf.getInt("hbase.hstore.compactionThreshold", 3));
|
/*old name*/ conf.getInt("hbase.hstore.compactionThreshold", 3)));
|
||||||
|
|
||||||
// Check if this is in-memory store
|
// Check if this is in-memory store
|
||||||
this.inMemory = family.isInMemory();
|
this.inMemory = family.isInMemory();
|
||||||
|
@ -198,7 +201,10 @@ public class Store implements HeapSize {
|
||||||
this.majorCompactionTime = getNextMajorCompactTime();
|
this.majorCompactionTime = getNextMajorCompactTime();
|
||||||
|
|
||||||
this.maxFilesToCompact = conf.getInt("hbase.hstore.compaction.max", 10);
|
this.maxFilesToCompact = conf.getInt("hbase.hstore.compaction.max", 10);
|
||||||
this.minCompactSize = this.region.memstoreFlushSize * 3 / 2; // +50% pad
|
this.minCompactSize = conf.getLong("hbase.hstore.compaction.min.size",
|
||||||
|
this.region.memstoreFlushSize);
|
||||||
|
this.maxCompactSize
|
||||||
|
= conf.getLong("hbase.hstore.compaction.max.size", 0);
|
||||||
this.compactRatio = conf.getFloat("hbase.hstore.compaction.ratio", 1.2F);
|
this.compactRatio = conf.getFloat("hbase.hstore.compaction.ratio", 1.2F);
|
||||||
|
|
||||||
if (Store.closeCheckInterval == 0) {
|
if (Store.closeCheckInterval == 0) {
|
||||||
|
@ -552,7 +558,7 @@ public class Store implements HeapSize {
|
||||||
// Tell listeners of the change in readers.
|
// Tell listeners of the change in readers.
|
||||||
notifyChangedReadersObservers();
|
notifyChangedReadersObservers();
|
||||||
|
|
||||||
return this.storefiles.size() >= this.compactionThreshold;
|
return this.storefiles.size() >= this.minFilesToCompact;
|
||||||
} finally {
|
} finally {
|
||||||
this.lock.writeLock().unlock();
|
this.lock.writeLock().unlock();
|
||||||
}
|
}
|
||||||
|
@ -609,129 +615,55 @@ public class Store implements HeapSize {
|
||||||
*/
|
*/
|
||||||
StoreSize compact(final boolean forceMajor) throws IOException {
|
StoreSize compact(final boolean forceMajor) throws IOException {
|
||||||
boolean forceSplit = this.region.shouldSplit(false);
|
boolean forceSplit = this.region.shouldSplit(false);
|
||||||
boolean majorcompaction = forceMajor;
|
|
||||||
synchronized (compactLock) {
|
synchronized (compactLock) {
|
||||||
this.lastCompactSize = 0;
|
this.lastCompactSize = 0; // reset first in case compaction is aborted
|
||||||
|
|
||||||
// filesToCompact are sorted oldest to newest.
|
// sanity checks
|
||||||
List<StoreFile> filesToCompact = this.storefiles;
|
for (StoreFile sf : this.storefiles) {
|
||||||
if (filesToCompact.isEmpty()) {
|
if (sf.getPath() == null || sf.getReader() == null) {
|
||||||
LOG.debug(this.storeNameStr + ": no store files to compact");
|
boolean np = sf.getPath() == null;
|
||||||
return null;
|
LOG.debug("StoreFile " + sf + " has null " + (np ? "Path":"Reader"));
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check to see if we need to do a major compaction on this region.
|
// if the user wants to force a split, skip compaction unless necessary
|
||||||
// If so, change doMajorCompaction to true to skip the incremental
|
boolean references = hasReferences(this.storefiles);
|
||||||
// compacting below. Only check if doMajorCompaction is not true.
|
if (forceSplit && !forceMajor && !references) {
|
||||||
if (!majorcompaction) {
|
|
||||||
majorcompaction = isMajorCompaction(filesToCompact);
|
|
||||||
}
|
|
||||||
|
|
||||||
boolean references = hasReferences(filesToCompact);
|
|
||||||
if (!majorcompaction && !references &&
|
|
||||||
(forceSplit || (filesToCompact.size() < compactionThreshold))) {
|
|
||||||
return checkSplit(forceSplit);
|
return checkSplit(forceSplit);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* get store file sizes for incremental compacting selection.
|
Collection<StoreFile> filesToCompact
|
||||||
* normal skew:
|
= compactSelection(this.storefiles, forceMajor);
|
||||||
*
|
|
||||||
* older ----> newer
|
// empty == do not compact
|
||||||
* _
|
if (filesToCompact.isEmpty()) {
|
||||||
* | | _
|
// but do see if we need to split before returning
|
||||||
* | | | | _
|
return checkSplit(forceSplit);
|
||||||
* --|-|- |-|- |-|---_-------_------- minCompactSize
|
|
||||||
* | | | | | | | | _ | |
|
|
||||||
* | | | | | | | | | | | |
|
|
||||||
* | | | | | | | | | | | |
|
|
||||||
*/
|
|
||||||
int countOfFiles = filesToCompact.size();
|
|
||||||
long [] fileSizes = new long[countOfFiles];
|
|
||||||
long [] sumSize = new long[countOfFiles];
|
|
||||||
for (int i = countOfFiles-1; i >= 0; --i) {
|
|
||||||
StoreFile file = filesToCompact.get(i);
|
|
||||||
Path path = file.getPath();
|
|
||||||
if (path == null) {
|
|
||||||
LOG.error("Path is null for " + file);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
StoreFile.Reader r = file.getReader();
|
|
||||||
if (r == null) {
|
|
||||||
LOG.error("StoreFile " + file + " has a null Reader");
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
fileSizes[i] = file.getReader().length();
|
|
||||||
// calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
|
|
||||||
int tooFar = i + this.maxFilesToCompact - 1;
|
|
||||||
sumSize[i] = fileSizes[i]
|
|
||||||
+ ((i+1 < countOfFiles) ? sumSize[i+1] : 0)
|
|
||||||
- ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// sum size of all files included in compaction
|
||||||
long totalSize = 0;
|
long totalSize = 0;
|
||||||
if (!majorcompaction && !references) {
|
for (StoreFile sf : filesToCompact) {
|
||||||
// we're doing a minor compaction, let's see what files are applicable
|
totalSize += sf.getReader().length();
|
||||||
int start = 0;
|
|
||||||
double r = this.compactRatio;
|
|
||||||
|
|
||||||
/* Start at the oldest file and stop when you find the first file that
|
|
||||||
* meets compaction criteria:
|
|
||||||
* (1) a recently-flushed, small file (i.e. <= minCompactSize)
|
|
||||||
* OR
|
|
||||||
* (2) within the compactRatio of sum(newer_files)
|
|
||||||
* Given normal skew, any newer files will also meet this criteria
|
|
||||||
*
|
|
||||||
* Additional Note:
|
|
||||||
* If fileSizes.size() >> maxFilesToCompact, we will recurse on
|
|
||||||
* compact(). Consider the oldest files first to avoid a
|
|
||||||
* situation where we always compact [end-threshold,end). Then, the
|
|
||||||
* last file becomes an aggregate of the previous compactions.
|
|
||||||
*/
|
|
||||||
while(countOfFiles - start >= this.compactionThreshold &&
|
|
||||||
fileSizes[start] >
|
|
||||||
Math.max(minCompactSize, (long)(sumSize[start+1] * r))) {
|
|
||||||
++start;
|
|
||||||
}
|
|
||||||
int end = Math.min(countOfFiles, start + this.maxFilesToCompact);
|
|
||||||
totalSize = fileSizes[start]
|
|
||||||
+ ((start+1 < countOfFiles) ? sumSize[start+1] : 0);
|
|
||||||
|
|
||||||
// if we don't have enough files to compact, just wait
|
|
||||||
if (end - start < this.compactionThreshold) {
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("Skipped compaction of " + this.storeNameStr
|
|
||||||
+ " because only " + (end - start) + " file(s) of size "
|
|
||||||
+ StringUtils.humanReadableInt(totalSize)
|
|
||||||
+ " meet compaction criteria.");
|
|
||||||
}
|
|
||||||
return checkSplit(forceSplit);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (0 == start && end == countOfFiles) {
|
|
||||||
// we decided all the files were candidates! major compact
|
|
||||||
majorcompaction = true;
|
|
||||||
} else {
|
|
||||||
filesToCompact = new ArrayList<StoreFile>(filesToCompact.subList(start,
|
|
||||||
end));
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// all files included in this compaction
|
|
||||||
for (long i : fileSizes) {
|
|
||||||
totalSize += i;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
this.lastCompactSize = totalSize;
|
this.lastCompactSize = totalSize;
|
||||||
|
|
||||||
|
// major compaction iff all StoreFiles are included
|
||||||
|
boolean majorcompaction
|
||||||
|
= (filesToCompact.size() == this.storefiles.size());
|
||||||
|
|
||||||
// Max-sequenceID is the last key in the files we're compacting
|
// Max-sequenceID is the last key in the files we're compacting
|
||||||
long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact);
|
long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact);
|
||||||
|
|
||||||
// Ready to go. Have list of files to compact.
|
// Ready to go. Have list of files to compact.
|
||||||
LOG.info("Started compaction of " + filesToCompact.size() + " file(s) in cf=" +
|
LOG.info("Started compaction of " + filesToCompact.size() + " file(s) in cf=" +
|
||||||
this.storeNameStr +
|
this.storeNameStr +
|
||||||
(references? ", hasReferences=true,": " ") + " into " +
|
(hasReferences(filesToCompact)? ", hasReferences=true,": " ") + " into " +
|
||||||
region.getTmpDir() + ", seqid=" + maxId +
|
region.getTmpDir() + ", seqid=" + maxId +
|
||||||
", totalSize=" + StringUtils.humanReadableInt(totalSize));
|
", totalSize=" + StringUtils.humanReadableInt(totalSize));
|
||||||
StoreFile.Writer writer = compact(filesToCompact, majorcompaction, maxId);
|
StoreFile.Writer writer
|
||||||
|
= compactStore(filesToCompact, majorcompaction, maxId);
|
||||||
// Move the compaction into place.
|
// Move the compaction into place.
|
||||||
StoreFile sf = completeCompaction(filesToCompact, writer);
|
StoreFile sf = completeCompaction(filesToCompact, writer);
|
||||||
if (LOG.isInfoEnabled()) {
|
if (LOG.isInfoEnabled()) {
|
||||||
|
@ -761,7 +693,8 @@ public class Store implements HeapSize {
|
||||||
boolean majorcompaction = (N == count);
|
boolean majorcompaction = (N == count);
|
||||||
|
|
||||||
// Ready to go. Have list of files to compact.
|
// Ready to go. Have list of files to compact.
|
||||||
StoreFile.Writer writer = compact(filesToCompact, majorcompaction, maxId);
|
StoreFile.Writer writer
|
||||||
|
= compactStore(filesToCompact, majorcompaction, maxId);
|
||||||
// Move the compaction into place.
|
// Move the compaction into place.
|
||||||
StoreFile sf = completeCompaction(filesToCompact, writer);
|
StoreFile sf = completeCompaction(filesToCompact, writer);
|
||||||
}
|
}
|
||||||
|
@ -820,10 +753,10 @@ public class Store implements HeapSize {
|
||||||
if (filesToCompact == null || filesToCompact.isEmpty() ||
|
if (filesToCompact == null || filesToCompact.isEmpty() ||
|
||||||
majorCompactionTime == 0) {
|
majorCompactionTime == 0) {
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
// TODO: Use better method for determining stamp of last major (HBASE-2990)
|
// TODO: Use better method for determining stamp of last major (HBASE-2990)
|
||||||
long lowTimestamp = getLowestTimestamp(fs,
|
long lowTimestamp = getLowestTimestamp(fs,
|
||||||
filesToCompact.get(0).getPath().getParent());
|
filesToCompact.get(0).getPath().getParent());
|
||||||
long now = System.currentTimeMillis();
|
long now = System.currentTimeMillis();
|
||||||
if (lowTimestamp > 0l && lowTimestamp < (now - this.majorCompactionTime)) {
|
if (lowTimestamp > 0l && lowTimestamp < (now - this.majorCompactionTime)) {
|
||||||
// Major compaction time has elapsed.
|
// Major compaction time has elapsed.
|
||||||
|
@ -842,7 +775,7 @@ public class Store implements HeapSize {
|
||||||
} else {
|
} else {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Major compaction triggered on store " + this.storeNameStr +
|
LOG.debug("Major compaction triggered on store " + this.storeNameStr +
|
||||||
"; time since last major compaction " + (now - lowTimestamp) + "ms");
|
"; time since last major compaction " + (now - lowTimestamp) + "ms");
|
||||||
}
|
}
|
||||||
result = true;
|
result = true;
|
||||||
this.majorCompactionTime = getNextMajorCompactTime();
|
this.majorCompactionTime = getNextMajorCompactTime();
|
||||||
|
@ -873,7 +806,149 @@ public class Store implements HeapSize {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Do a minor/major compaction. Uses the scan infrastructure to make it easy.
|
* Algorithm to choose which files to compact
|
||||||
|
*
|
||||||
|
* Configuration knobs:
|
||||||
|
* "hbase.hstore.compaction.ratio"
|
||||||
|
* normal case: minor compact when file <= sum(smaller_files) * ratio
|
||||||
|
* "hbase.hstore.compaction.min.size"
|
||||||
|
* unconditionally compact individual files below this size
|
||||||
|
* "hbase.hstore.compaction.max.size"
|
||||||
|
* never compact individual files above this size (unless splitting)
|
||||||
|
* "hbase.hstore.compaction.min"
|
||||||
|
* min files needed to minor compact
|
||||||
|
* "hbase.hstore.compaction.max"
|
||||||
|
* max files to compact at once (avoids OOM)
|
||||||
|
*
|
||||||
|
* @param candidates candidate files, ordered from oldest to newest
|
||||||
|
* @param majorcompaction whether to force a major compaction
|
||||||
|
* @return subset copy of candidate list that meets compaction criteria
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
List<StoreFile> compactSelection(List<StoreFile> candidates,
|
||||||
|
boolean forcemajor) throws IOException {
|
||||||
|
/* normal skew:
|
||||||
|
*
|
||||||
|
* older ----> newer
|
||||||
|
* _
|
||||||
|
* | | _
|
||||||
|
* | | | | _
|
||||||
|
* --|-|- |-|- |-|---_-------_------- minCompactSize
|
||||||
|
* | | | | | | | | _ | |
|
||||||
|
* | | | | | | | | | | | |
|
||||||
|
* | | | | | | | | | | | |
|
||||||
|
*/
|
||||||
|
List<StoreFile> filesToCompact = new ArrayList<StoreFile>(candidates);
|
||||||
|
|
||||||
|
// Do not compact files above a configurable max filesize unless they are
|
||||||
|
// references. We MUST compact these
|
||||||
|
if (this.maxCompactSize > 0) {
|
||||||
|
final long msize = this.maxCompactSize;
|
||||||
|
filesToCompact.removeAll(Collections2.filter(filesToCompact,
|
||||||
|
new Predicate<StoreFile>() {
|
||||||
|
public boolean apply(StoreFile sf) {
|
||||||
|
// NOTE: keep all references. we must compact them
|
||||||
|
return sf.getReader().length() > msize && !sf.isReference();
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
// major compact on user action or age (caveat: we have too many files)
|
||||||
|
boolean majorcompaction = forcemajor ||
|
||||||
|
(isMajorCompaction(filesToCompact) &&
|
||||||
|
filesToCompact.size() > this.maxFilesToCompact);
|
||||||
|
|
||||||
|
if (filesToCompact.isEmpty()) {
|
||||||
|
LOG.debug(this.storeNameStr + ": no store files to compact");
|
||||||
|
return filesToCompact;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!majorcompaction && !hasReferences(filesToCompact)) {
|
||||||
|
// we're doing a minor compaction, let's see what files are applicable
|
||||||
|
int start = 0;
|
||||||
|
double r = this.compactRatio;
|
||||||
|
|
||||||
|
// Sort files by size to correct when normal skew is altered by bulk load.
|
||||||
|
//
|
||||||
|
// So, technically, order is important for optimizations like the TimeStamp
|
||||||
|
// filter. However, realistically this isn't a problem because our normal
|
||||||
|
// skew always decreases in filesize over time. The only place where our
|
||||||
|
// skew doesn't decrease is for files that have been recently flushed.
|
||||||
|
// However, all those will be unconditionally compacted because they will
|
||||||
|
// be lower than "hbase.hstore.compaction.min.size".
|
||||||
|
//
|
||||||
|
// The sorting is to handle an interesting issue that popped up for us
|
||||||
|
// during migration: we're bulk loading StoreFiles of extremely variable
|
||||||
|
// size (are we migrating 1k users or 10M?) and they will all appear at
|
||||||
|
// the end of the StoreFile list. How do we determine when it is
|
||||||
|
// efficient to compact them? The easiest option was to sort the compact
|
||||||
|
// list and handle bulk files by relative size instead of making some
|
||||||
|
// custom compaction selection algorithm just for bulk inclusion. It
|
||||||
|
// seems like any other companies that will incrementally migrate data
|
||||||
|
// into HBase would hit the same issue. Nicolas.
|
||||||
|
//
|
||||||
|
Collections.sort(filesToCompact, StoreFile.Comparators.FILE_SIZE);
|
||||||
|
|
||||||
|
// get store file sizes for incremental compacting selection.
|
||||||
|
int countOfFiles = filesToCompact.size();
|
||||||
|
long [] fileSizes = new long[countOfFiles];
|
||||||
|
long [] sumSize = new long[countOfFiles];
|
||||||
|
for (int i = countOfFiles-1; i >= 0; --i) {
|
||||||
|
StoreFile file = filesToCompact.get(i);
|
||||||
|
fileSizes[i] = file.getReader().length();
|
||||||
|
// calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
|
||||||
|
int tooFar = i + this.maxFilesToCompact - 1;
|
||||||
|
sumSize[i] = fileSizes[i]
|
||||||
|
+ ((i+1 < countOfFiles) ? sumSize[i+1] : 0)
|
||||||
|
- ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Start at the oldest file and stop when you find the first file that
|
||||||
|
* meets compaction criteria:
|
||||||
|
* (1) a recently-flushed, small file (i.e. <= minCompactSize)
|
||||||
|
* OR
|
||||||
|
* (2) within the compactRatio of sum(newer_files)
|
||||||
|
* Given normal skew, any newer files will also meet this criteria
|
||||||
|
*
|
||||||
|
* Additional Note:
|
||||||
|
* If fileSizes.size() >> maxFilesToCompact, we will recurse on
|
||||||
|
* compact(). Consider the oldest files first to avoid a
|
||||||
|
* situation where we always compact [end-threshold,end). Then, the
|
||||||
|
* last file becomes an aggregate of the previous compactions.
|
||||||
|
*/
|
||||||
|
while(countOfFiles - start >= this.minFilesToCompact &&
|
||||||
|
fileSizes[start] >
|
||||||
|
Math.max(minCompactSize, (long)(sumSize[start+1] * r))) {
|
||||||
|
++start;
|
||||||
|
}
|
||||||
|
int end = Math.min(countOfFiles, start + this.maxFilesToCompact);
|
||||||
|
long totalSize = fileSizes[start]
|
||||||
|
+ ((start+1 < countOfFiles) ? sumSize[start+1] : 0);
|
||||||
|
filesToCompact = filesToCompact.subList(start, end);
|
||||||
|
|
||||||
|
// if we don't have enough files to compact, just wait
|
||||||
|
if (filesToCompact.size() < this.minFilesToCompact) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Skipped compaction of " + this.storeNameStr
|
||||||
|
+ ". Only " + (end - start) + " file(s) of size "
|
||||||
|
+ StringUtils.humanReadableInt(totalSize)
|
||||||
|
+ " have met compaction criteria.");
|
||||||
|
}
|
||||||
|
return Collections.emptyList();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// all files included in this compaction, up to max
|
||||||
|
if (filesToCompact.size() > this.maxFilesToCompact) {
|
||||||
|
int pastMax = filesToCompact.size() - this.maxFilesToCompact;
|
||||||
|
filesToCompact.subList(0, pastMax).clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return filesToCompact;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Do a minor/major compaction on an explicit set of storefiles in a Store.
|
||||||
|
* Uses the scan infrastructure to make it easy.
|
||||||
*
|
*
|
||||||
* @param filesToCompact which files to compact
|
* @param filesToCompact which files to compact
|
||||||
* @param majorCompaction true to major compact (prune all deletes, max versions, etc)
|
* @param majorCompaction true to major compact (prune all deletes, max versions, etc)
|
||||||
|
@ -882,7 +957,7 @@ public class Store implements HeapSize {
|
||||||
* nothing made it through the compaction.
|
* nothing made it through the compaction.
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
private StoreFile.Writer compact(final List<StoreFile> filesToCompact,
|
private StoreFile.Writer compactStore(final Collection<StoreFile> filesToCompact,
|
||||||
final boolean majorCompaction, final long maxId)
|
final boolean majorCompaction, final long maxId)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
// calculate maximum key count after compaction (for blooms)
|
// calculate maximum key count after compaction (for blooms)
|
||||||
|
@ -987,7 +1062,7 @@ public class Store implements HeapSize {
|
||||||
* @return StoreFile created. May be null.
|
* @return StoreFile created. May be null.
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
private StoreFile completeCompaction(final List<StoreFile> compactedFiles,
|
private StoreFile completeCompaction(final Collection<StoreFile> compactedFiles,
|
||||||
final StoreFile.Writer compactedFile)
|
final StoreFile.Writer compactedFile)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
// 1. Moving the new files into place -- if there is a new file (may not
|
// 1. Moving the new files into place -- if there is a new file (may not
|
||||||
|
@ -1521,15 +1596,15 @@ public class Store implements HeapSize {
|
||||||
/**
|
/**
|
||||||
* See if there's too much store files in this store
|
* See if there's too much store files in this store
|
||||||
* @return true if number of store files is greater than
|
* @return true if number of store files is greater than
|
||||||
* the number defined in compactionThreshold
|
* the number defined in minFilesToCompact
|
||||||
*/
|
*/
|
||||||
public boolean hasTooManyStoreFiles() {
|
public boolean hasTooManyStoreFiles() {
|
||||||
return this.storefiles.size() > this.compactionThreshold;
|
return this.storefiles.size() > this.minFilesToCompact;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static final long FIXED_OVERHEAD = ClassSize.align(
|
public static final long FIXED_OVERHEAD = ClassSize.align(
|
||||||
ClassSize.OBJECT + (15 * ClassSize.REFERENCE) +
|
ClassSize.OBJECT + (15 * ClassSize.REFERENCE) +
|
||||||
(6 * Bytes.SIZEOF_LONG) + (1 * Bytes.SIZEOF_DOUBLE) +
|
(7 * Bytes.SIZEOF_LONG) + (1 * Bytes.SIZEOF_DOUBLE) +
|
||||||
(4 * Bytes.SIZEOF_INT) + (Bytes.SIZEOF_BOOLEAN * 2));
|
(4 * Bytes.SIZEOF_INT) + (Bytes.SIZEOF_BOOLEAN * 2));
|
||||||
|
|
||||||
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
|
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
|
||||||
|
|
|
@ -26,6 +26,7 @@ import java.lang.management.MemoryUsage;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.text.NumberFormat;
|
import java.text.NumberFormat;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -303,7 +304,7 @@ public class StoreFile {
|
||||||
* @return 0 if no non-bulk-load files are provided or, this is Store that
|
* @return 0 if no non-bulk-load files are provided or, this is Store that
|
||||||
* does not yet have any store files.
|
* does not yet have any store files.
|
||||||
*/
|
*/
|
||||||
public static long getMaxSequenceIdInList(List<StoreFile> sfs) {
|
public static long getMaxSequenceIdInList(Collection<StoreFile> sfs) {
|
||||||
long max = 0;
|
long max = 0;
|
||||||
for (StoreFile sf : sfs) {
|
for (StoreFile sf : sfs) {
|
||||||
if (!sf.isBulkLoadResult()) {
|
if (!sf.isBulkLoadResult()) {
|
||||||
|
@ -909,6 +910,13 @@ public class StoreFile {
|
||||||
bloomFilterType = BloomType.NONE;
|
bloomFilterType = BloomType.NONE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ONLY USE DEFAULT CONSTRUCTOR FOR UNIT TESTS
|
||||||
|
*/
|
||||||
|
Reader() {
|
||||||
|
this.reader = null;
|
||||||
|
}
|
||||||
|
|
||||||
public RawComparator<byte []> getComparator() {
|
public RawComparator<byte []> getComparator() {
|
||||||
return reader.getComparator();
|
return reader.getComparator();
|
||||||
}
|
}
|
||||||
|
@ -1132,5 +1140,15 @@ public class StoreFile {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* FILE_SIZE = descending sort StoreFiles (largest --> smallest in size)
|
||||||
|
*/
|
||||||
|
static final Comparator<StoreFile> FILE_SIZE =
|
||||||
|
Ordering.natural().reverse().onResultOf(new Function<StoreFile, Long>() {
|
||||||
|
@Override
|
||||||
|
public Long apply(StoreFile sf) {
|
||||||
|
return sf.getReader().length();
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,208 @@
|
||||||
|
/**
|
||||||
|
* Copyright 2010 The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import junit.framework.TestCase;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.StoreFile.Reader;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.wal.TestWALReplay;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
|
||||||
|
public class TestCompactSelection extends TestCase {
|
||||||
|
private final static Log LOG = LogFactory.getLog(TestCompactSelection.class);
|
||||||
|
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||||
|
|
||||||
|
private Configuration conf;
|
||||||
|
private Store store;
|
||||||
|
private static final String DIR
|
||||||
|
= HBaseTestingUtility.getTestDir() + "/TestCompactSelection/";
|
||||||
|
|
||||||
|
private static final int minFiles = 3;
|
||||||
|
private static final int maxFiles = 5;
|
||||||
|
|
||||||
|
private static final long minSize = 10;
|
||||||
|
private static final long maxSize = 1000;
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
// setup config values necessary for store
|
||||||
|
this.conf = TEST_UTIL.getConfiguration();
|
||||||
|
this.conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 0);
|
||||||
|
this.conf.setInt("hbase.hstore.compaction.min", minFiles);
|
||||||
|
this.conf.setInt("hbase.hstore.compaction.max", maxFiles);
|
||||||
|
this.conf.setLong("hbase.hregion.memstore.flush.size", minSize);
|
||||||
|
this.conf.setLong("hbase.hstore.compaction.max.size", maxSize);
|
||||||
|
this.conf.setFloat("hbase.hstore.compaction.ratio", 1.0F);
|
||||||
|
|
||||||
|
//Setting up a Store
|
||||||
|
Path basedir = new Path(DIR);
|
||||||
|
Path logdir = new Path(DIR+"/logs");
|
||||||
|
Path oldLogDir = new Path(basedir, HConstants.HREGION_OLDLOGDIR_NAME);
|
||||||
|
HColumnDescriptor hcd = new HColumnDescriptor(Bytes.toBytes("family"));
|
||||||
|
FileSystem fs = FileSystem.get(conf);
|
||||||
|
|
||||||
|
fs.delete(logdir, true);
|
||||||
|
|
||||||
|
HTableDescriptor htd = new HTableDescriptor(Bytes.toBytes("table"));
|
||||||
|
htd.addFamily(hcd);
|
||||||
|
HRegionInfo info = new HRegionInfo(htd, null, null, false);
|
||||||
|
HLog hlog = new HLog(fs, logdir, oldLogDir, conf);
|
||||||
|
HRegion region = new HRegion(basedir, hlog, fs, conf, info, null);
|
||||||
|
|
||||||
|
store = new Store(basedir, region, hcd, fs, conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
// used so our tests don't deal with actual StoreFiles
|
||||||
|
static class MockStoreFile extends StoreFile {
|
||||||
|
long length = 0;
|
||||||
|
boolean isRef = false;
|
||||||
|
|
||||||
|
MockStoreFile(long length, boolean isRef) throws IOException {
|
||||||
|
super(TEST_UTIL.getTestFileSystem(), new Path("_"), false,
|
||||||
|
TEST_UTIL.getConfiguration(), BloomType.NONE, false);
|
||||||
|
this.length = length;
|
||||||
|
this.isRef = isRef;
|
||||||
|
}
|
||||||
|
|
||||||
|
void setLength(long newLen) {
|
||||||
|
this.length = newLen;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
boolean isMajorCompaction() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
boolean isReference() {
|
||||||
|
return this.isRef;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public StoreFile.Reader getReader() {
|
||||||
|
final long len = this.length;
|
||||||
|
return new StoreFile.Reader() {
|
||||||
|
@Override
|
||||||
|
public long length() {
|
||||||
|
return len;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
List<StoreFile> sfCreate(long ... sizes) throws IOException {
|
||||||
|
return sfCreate(false, sizes);
|
||||||
|
}
|
||||||
|
|
||||||
|
List<StoreFile> sfCreate(boolean isReference, long ... sizes)
|
||||||
|
throws IOException {
|
||||||
|
List<StoreFile> ret = Lists.newArrayList();
|
||||||
|
for (long i : sizes) {
|
||||||
|
ret.add(new MockStoreFile(i, isReference));
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
void compactEquals(List<StoreFile> actual, long ... expected)
|
||||||
|
throws IOException {
|
||||||
|
compactEquals(actual, false, expected);
|
||||||
|
}
|
||||||
|
|
||||||
|
void compactEquals(List<StoreFile> actual, boolean forcemajor,
|
||||||
|
long ... expected)
|
||||||
|
throws IOException {
|
||||||
|
List<StoreFile> result = store.compactSelection(actual, forcemajor);
|
||||||
|
long[] aNums = new long[result.size()];
|
||||||
|
for (int i=0; i <result.size(); ++i) {
|
||||||
|
aNums[i] = result.get(i).getReader().length();
|
||||||
|
}
|
||||||
|
assertEquals(Arrays.toString(expected), Arrays.toString(aNums));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testCompactionRatio() throws IOException {
|
||||||
|
/*
|
||||||
|
* NOTE: these tests are specific to describe the implementation of the
|
||||||
|
* current compaction algorithm. Developed to ensure that refactoring
|
||||||
|
* doesn't implicitly alter this.
|
||||||
|
*/
|
||||||
|
long tooBig = maxSize + 1;
|
||||||
|
|
||||||
|
// default case. preserve user ratio on size
|
||||||
|
compactEquals(sfCreate(100,50,23,12,12), 23, 12, 12);
|
||||||
|
// less than compact threshold = don't compact
|
||||||
|
compactEquals(sfCreate(100,50,25,12,12) /* empty */);
|
||||||
|
// greater than compact size = skip those
|
||||||
|
compactEquals(sfCreate(tooBig, tooBig, 700, 700, 700), 700, 700, 700);
|
||||||
|
// big size + threshold
|
||||||
|
compactEquals(sfCreate(tooBig, tooBig, 700,700) /* empty */);
|
||||||
|
// small files = don't care about ratio
|
||||||
|
compactEquals(sfCreate(8,3,1), 8,3,1);
|
||||||
|
// sort first so you don't include huge file the tail end
|
||||||
|
// happens with HFileOutputFormat bulk migration
|
||||||
|
compactEquals(sfCreate(100,50,23,12,12, 500), 23, 12, 12);
|
||||||
|
// don't exceed max file compact threshold
|
||||||
|
assertEquals(maxFiles,
|
||||||
|
store.compactSelection(sfCreate(7,6,5,4,3,2,1), false).size());
|
||||||
|
|
||||||
|
/* MAJOR COMPACTION */
|
||||||
|
// if a major compaction has been forced, then compact everything
|
||||||
|
compactEquals(sfCreate(100,50,25,12,12), true, 100, 50, 25, 12, 12);
|
||||||
|
// also choose files < threshold on major compaction
|
||||||
|
compactEquals(sfCreate(12,12), true, 12, 12);
|
||||||
|
// unless one of those files is too big
|
||||||
|
compactEquals(sfCreate(tooBig, 12,12), true, 12, 12);
|
||||||
|
// don't exceed max file compact threshold, even with major compaction
|
||||||
|
assertEquals(maxFiles,
|
||||||
|
store.compactSelection(sfCreate(7,6,5,4,3,2,1), true).size());
|
||||||
|
|
||||||
|
/* REFERENCES == file is from a region that was split */
|
||||||
|
// treat storefiles that have references like a major compaction
|
||||||
|
compactEquals(sfCreate(true, 100,50,25,12,12), true, 100, 50, 25, 12, 12);
|
||||||
|
// reference files shouldn't obey max threshold
|
||||||
|
compactEquals(sfCreate(true, tooBig, 12,12), true, tooBig, 12, 12);
|
||||||
|
// reference files should obey max file compact to avoid OOM
|
||||||
|
assertEquals(maxFiles,
|
||||||
|
store.compactSelection(sfCreate(true, 7,6,5,4,3,2,1), true).size());
|
||||||
|
|
||||||
|
// empty case
|
||||||
|
compactEquals(new ArrayList<StoreFile>() /* empty */);
|
||||||
|
// empty case (because all files are too big)
|
||||||
|
compactEquals(sfCreate(tooBig, tooBig) /* empty */);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue