HBASE-7110 refactor the compaction selection and config code similarly to 0.89-fb changes; REVERT of original patch and ADDENDUM because applied old patch originally, v8

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1414000 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2012-11-27 05:23:14 +00:00
parent 5d19fcfd83
commit 8e49c2a45d
12 changed files with 440 additions and 1187 deletions

View File

@ -4172,7 +4172,7 @@ public class HRegion implements HeapSize { // , Writable{
} }
/** /**
* @return True if needs a major compaction. * @return True if needs a mojor compaction.
* @throws IOException * @throws IOException
*/ */
boolean isMajorCompaction() throws IOException { boolean isMajorCompaction() throws IOException {

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
@ -64,7 +63,9 @@ import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.io.hfile.InvalidHFileException; import org.apache.hadoop.hbase.io.hfile.InvalidHFileException;
import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder; import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.compactions.*; import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType; import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.ClassSize;
@ -74,6 +75,8 @@ import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -100,9 +103,8 @@ import com.google.common.collect.Lists;
* <p>Locking and transactions are handled at a higher level. This API should * <p>Locking and transactions are handled at a higher level. This API should
* not be called directly but by an HRegion manager. * not be called directly but by an HRegion manager.
*/ */
//TODO: move StoreConfiguration implementation into a separate class.
@InterfaceAudience.Private @InterfaceAudience.Private
public class HStore implements Store, StoreConfiguration { public class HStore implements Store {
static final Log LOG = LogFactory.getLog(HStore.class); static final Log LOG = LogFactory.getLog(HStore.class);
protected final MemStore memstore; protected final MemStore memstore;
@ -110,12 +112,15 @@ public class HStore implements Store, StoreConfiguration {
private final Path homedir; private final Path homedir;
private final HRegion region; private final HRegion region;
private final HColumnDescriptor family; private final HColumnDescriptor family;
CompactionPolicy compactionPolicy;
final FileSystem fs; final FileSystem fs;
final Configuration conf; final Configuration conf;
final CacheConfig cacheConf; final CacheConfig cacheConf;
// ttl in milliseconds. TODO: can this be removed? Already stored in scanInfo. // ttl in milliseconds.
private long ttl; private long ttl;
private final int minFilesToCompact;
private final int maxFilesToCompact;
private final long minCompactSize;
private final long maxCompactSize;
private long lastCompactSize = 0; private long lastCompactSize = 0;
volatile boolean forceMajor = false; volatile boolean forceMajor = false;
/* how many bytes to write between status checks */ /* how many bytes to write between status checks */
@ -188,7 +193,7 @@ public class HStore implements Store, StoreConfiguration {
this.comparator = info.getComparator(); this.comparator = info.getComparator();
// Get TTL // Get TTL
this.ttl = determineTTLFromFamily(family); this.ttl = getTTL(family);
// used by ScanQueryMatcher // used by ScanQueryMatcher
long timeToPurgeDeletes = long timeToPurgeDeletes =
Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0); Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
@ -199,11 +204,23 @@ public class HStore implements Store, StoreConfiguration {
scanInfo = new ScanInfo(family, ttl, timeToPurgeDeletes, this.comparator); scanInfo = new ScanInfo(family, ttl, timeToPurgeDeletes, this.comparator);
this.memstore = new MemStore(conf, this.comparator); this.memstore = new MemStore(conf, this.comparator);
// By default, compact if storefile.count >= minFilesToCompact
this.minFilesToCompact = Math.max(2,
conf.getInt("hbase.hstore.compaction.min",
/*old name*/ conf.getInt("hbase.hstore.compactionThreshold", 3)));
LOG.info("hbase.hstore.compaction.min = " + this.minFilesToCompact);
// Setting up cache configuration for this family // Setting up cache configuration for this family
this.cacheConf = new CacheConfig(conf, family); this.cacheConf = new CacheConfig(conf, family);
this.blockingStoreFileCount = this.blockingStoreFileCount =
conf.getInt("hbase.hstore.blockingStoreFiles", 7); conf.getInt("hbase.hstore.blockingStoreFiles", 7);
this.maxFilesToCompact = conf.getInt("hbase.hstore.compaction.max", 10);
this.minCompactSize = conf.getLong("hbase.hstore.compaction.min.size",
this.region.memstoreFlushSize);
this.maxCompactSize
= conf.getLong("hbase.hstore.compaction.max.size", Long.MAX_VALUE);
this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false); this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false);
if (HStore.closeCheckInterval == 0) { if (HStore.closeCheckInterval == 0) {
@ -217,16 +234,14 @@ public class HStore implements Store, StoreConfiguration {
// initilize bytes per checksum // initilize bytes per checksum
this.bytesPerChecksum = getBytesPerChecksum(conf); this.bytesPerChecksum = getBytesPerChecksum(conf);
// Create a compaction tool instance // Create a compaction tool instance
this.compactor = new Compactor(conf); this.compactor = new Compactor(this.conf);
// Create a compaction manager.
this.compactionPolicy = new CompactionPolicy(conf, this);
} }
/** /**
* @param family * @param family
* @return * @return
*/ */
private static long determineTTLFromFamily(final HColumnDescriptor family) { long getTTL(final HColumnDescriptor family) {
// HCD.getTimeToLive returns ttl in seconds. Convert to milliseconds. // HCD.getTimeToLive returns ttl in seconds. Convert to milliseconds.
long ttl = family.getTimeToLive(); long ttl = family.getTimeToLive();
if (ttl == HConstants.FOREVER) { if (ttl == HConstants.FOREVER) {
@ -270,22 +285,6 @@ public class HStore implements Store, StoreConfiguration {
return this.fs; return this.fs;
} }
/* Implementation of StoreConfiguration */
public long getStoreFileTtl() {
// TTL only applies if there's no MIN_VERSIONs setting on the column.
return (this.scanInfo.getMinVersions() == 0) ? this.ttl : Long.MAX_VALUE;
}
public Long getMajorCompactionPeriod() {
String strCompactionTime = this.family.getValue(HConstants.MAJOR_COMPACTION_PERIOD);
return (strCompactionTime != null) ? new Long(strCompactionTime) : null;
}
public long getMemstoreFlushSize() {
return this.region.memstoreFlushSize;
}
/* End implementation of StoreConfiguration */
/** /**
* Returns the configured bytesPerChecksum value. * Returns the configured bytesPerChecksum value.
* @param conf The configuration * @param conf The configuration
@ -353,8 +352,7 @@ public class HStore implements Store, StoreConfiguration {
* @param family family name of this store * @param family family name of this store
* @return Path to the family/Store home directory * @return Path to the family/Store home directory
*/ */
public static Path getStoreHomedir(final Path parentRegionDirectory, public static Path getStoreHomedir(final Path parentRegionDirectory, final byte[] family) {
final byte[] family) {
return new Path(parentRegionDirectory, new Path(Bytes.toString(family))); return new Path(parentRegionDirectory, new Path(Bytes.toString(family)));
} }
/** /**
@ -568,8 +566,7 @@ public class HStore implements Store, StoreConfiguration {
"the destination store. Copying file over to destination filesystem."); "the destination store. Copying file over to destination filesystem.");
Path tmpPath = getTmpPath(); Path tmpPath = getTmpPath();
FileUtil.copy(srcFs, srcPath, fs, tmpPath, false, conf); FileUtil.copy(srcFs, srcPath, fs, tmpPath, false, conf);
LOG.info("Copied " + srcPath LOG.info("Copied " + srcPath + " to temporary path on destination filesystem: " + tmpPath);
+ " to temporary path on destination filesystem: " + tmpPath);
srcPath = tmpPath; srcPath = tmpPath;
} }
@ -666,8 +663,8 @@ public class HStore implements Store, StoreConfiguration {
/** /**
* Snapshot this stores memstore. Call before running * Snapshot this stores memstore. Call before running
* {@link #flushCache(long, SortedSet, TimeRangeTracker, AtomicLong, MonitoredTask)} * {@link #flushCache(long, SortedSet, TimeRangeTracker, AtomicLong, MonitoredTask)} so it has
* so it has some work to do. * some work to do.
*/ */
void snapshot() { void snapshot() {
this.memstore.snapshot(); this.memstore.snapshot();
@ -725,8 +722,7 @@ public class HStore implements Store, StoreConfiguration {
InternalScanner scanner = null; InternalScanner scanner = null;
KeyValueScanner memstoreScanner = new CollectionBackedScanner(set, this.comparator); KeyValueScanner memstoreScanner = new CollectionBackedScanner(set, this.comparator);
if (getHRegion().getCoprocessorHost() != null) { if (getHRegion().getCoprocessorHost() != null) {
scanner = getHRegion().getCoprocessorHost() scanner = getHRegion().getCoprocessorHost().preFlushScannerOpen(this, memstoreScanner);
.preFlushScannerOpen(this, memstoreScanner);
} }
if (scanner == null) { if (scanner == null) {
Scan scan = new Scan(); Scan scan = new Scan();
@ -763,8 +759,7 @@ public class HStore implements Store, StoreConfiguration {
if (!kvs.isEmpty()) { if (!kvs.isEmpty()) {
for (KeyValue kv : kvs) { for (KeyValue kv : kvs) {
// If we know that this KV is going to be included always, then let us // If we know that this KV is going to be included always, then let us
// set its memstoreTS to 0. This will help us save space when writing to // set its memstoreTS to 0. This will help us save space when writing to disk.
// disk.
if (kv.getMemstoreTS() <= smallestReadPoint) { if (kv.getMemstoreTS() <= smallestReadPoint) {
// let us not change the original KV. It could be in the memstore // let us not change the original KV. It could be in the memstore
// changing its memstoreTS could affect other threads/scanners. // changing its memstoreTS could affect other threads/scanners.
@ -779,8 +774,7 @@ public class HStore implements Store, StoreConfiguration {
} while (hasMore); } while (hasMore);
} finally { } finally {
// Write out the log sequence number that corresponds to this output // Write out the log sequence number that corresponds to this output
// hfile. Also write current time in metadata as minFlushTime. // hfile. The hfile is current up to and including logCacheFlushId.
// The hfile is current up to and including logCacheFlushId.
status.setStatus("Flushing " + this + ": appending metadata"); status.setStatus("Flushing " + this + ": appending metadata");
writer.appendMetadata(logCacheFlushId, false); writer.appendMetadata(logCacheFlushId, false);
status.setStatus("Flushing " + this + ": closing flushed file"); status.setStatus("Flushing " + this + ": closing flushed file");
@ -1010,12 +1004,12 @@ public class HStore implements Store, StoreConfiguration {
// Ready to go. Have list of files to compact. // Ready to go. Have list of files to compact.
LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in " LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
+ this + " of " + this.region.getRegionInfo().getRegionNameAsString() + this + " of "
+ this.region.getRegionInfo().getRegionNameAsString()
+ " into tmpdir=" + region.getTmpDir() + ", seqid=" + maxId + ", totalSize=" + " into tmpdir=" + region.getTmpDir() + ", seqid=" + maxId + ", totalSize="
+ StringUtils.humanReadableInt(cr.getSize())); + StringUtils.humanReadableInt(cr.getSize()));
StoreFile sf = null; StoreFile sf = null;
long compactionStartTime = EnvironmentEdgeManager.currentTimeMillis();
try { try {
StoreFile.Writer writer = StoreFile.Writer writer =
this.compactor.compact(this, filesToCompact, cr.isMajor(), maxId); this.compactor.compact(this, filesToCompact, cr.isMajor(), maxId);
@ -1037,7 +1031,6 @@ public class HStore implements Store, StoreConfiguration {
} }
} }
long now = EnvironmentEdgeManager.currentTimeMillis();
LOG.info("Completed" + (cr.isMajor() ? " major " : " ") + "compaction of " LOG.info("Completed" + (cr.isMajor() ? " major " : " ") + "compaction of "
+ filesToCompact.size() + " file(s) in " + this + " of " + filesToCompact.size() + " file(s) in " + this + " of "
+ this.region.getRegionInfo().getRegionNameAsString() + this.region.getRegionInfo().getRegionNameAsString()
@ -1045,11 +1038,8 @@ public class HStore implements Store, StoreConfiguration {
(sf == null ? "none" : sf.getPath().getName()) + (sf == null ? "none" : sf.getPath().getName()) +
", size=" + (sf == null ? "none" : ", size=" + (sf == null ? "none" :
StringUtils.humanReadableInt(sf.getReader().length())) StringUtils.humanReadableInt(sf.getReader().length()))
+ "; total size for store is " + StringUtils.humanReadableInt(storeSize) + "; total size for store is "
+ ". This selection was in queue for " + StringUtils.humanReadableInt(storeSize));
+ StringUtils.formatTimeDiff(compactionStartTime, cr.getSelectionTime())
+ ", and took " + StringUtils.formatTimeDiff(now, compactionStartTime)
+ " to execute.");
return sf; return sf;
} }
@ -1104,7 +1094,38 @@ public class HStore implements Store, StoreConfiguration {
@Override @Override
public boolean hasReferences() { public boolean hasReferences() {
return StoreUtils.hasReferences(this.storefiles); return hasReferences(this.storefiles);
}
/*
* @param files
* @return True if any of the files in <code>files</code> are References.
*/
private boolean hasReferences(Collection<StoreFile> files) {
if (files != null && files.size() > 0) {
for (StoreFile hsf: files) {
if (hsf.isReference()) {
return true;
}
}
}
return false;
}
/*
* Gets lowest timestamp from candidate StoreFiles
*
* @param fs
* @param dir
* @throws IOException
*/
public static long getLowestTimestamp(final List<StoreFile> candidates)
throws IOException {
long minTs = Long.MAX_VALUE;
for (StoreFile storeFile : candidates) {
minTs = Math.min(minTs, storeFile.getModificationTimeStamp());
}
return minTs;
} }
@Override @Override
@ -1122,7 +1143,91 @@ public class HStore implements Store, StoreConfiguration {
} }
List<StoreFile> candidates = new ArrayList<StoreFile>(this.storefiles); List<StoreFile> candidates = new ArrayList<StoreFile>(this.storefiles);
return compactionPolicy.isMajorCompaction(candidates);
// exclude files above the max compaction threshold
// except: save all references. we MUST compact them
int pos = 0;
while (pos < candidates.size() &&
candidates.get(pos).getReader().length() > this.maxCompactSize &&
!candidates.get(pos).isReference()) ++pos;
candidates.subList(0, pos).clear();
return isMajorCompaction(candidates);
}
/*
* @param filesToCompact Files to compact. Can be null.
* @return True if we should run a major compaction.
*/
private boolean isMajorCompaction(final List<StoreFile> filesToCompact) throws IOException {
boolean result = false;
long mcTime = getNextMajorCompactTime();
if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) {
return result;
}
// TODO: Use better method for determining stamp of last major (HBASE-2990)
long lowTimestamp = getLowestTimestamp(filesToCompact);
long now = System.currentTimeMillis();
if (lowTimestamp > 0l && lowTimestamp < (now - mcTime)) {
// Major compaction time has elapsed.
if (filesToCompact.size() == 1) {
// Single file
StoreFile sf = filesToCompact.get(0);
long oldest =
(sf.getReader().timeRangeTracker == null) ?
Long.MIN_VALUE :
now - sf.getReader().timeRangeTracker.minimumTimestamp;
if (sf.isMajorCompaction() &&
(this.ttl == HConstants.FOREVER || oldest < this.ttl)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping major compaction of " + this +
" because one (major) compacted file only and oldestTime " +
oldest + "ms is < ttl=" + this.ttl);
}
} else if (this.ttl != HConstants.FOREVER && oldest > this.ttl) {
LOG.debug("Major compaction triggered on store " + this +
", because keyvalues outdated; time since last major compaction " +
(now - lowTimestamp) + "ms");
result = true;
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Major compaction triggered on store " + this +
"; time since last major compaction " + (now - lowTimestamp) + "ms");
}
result = true;
}
}
return result;
}
long getNextMajorCompactTime() {
// default = 24hrs
long ret = conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24);
if (family.getValue(HConstants.MAJOR_COMPACTION_PERIOD) != null) {
String strCompactionTime =
family.getValue(HConstants.MAJOR_COMPACTION_PERIOD);
ret = (new Long(strCompactionTime)).longValue();
}
if (ret > 0) {
// default = 20% = +/- 4.8 hrs
double jitterPct = conf.getFloat("hbase.hregion.majorcompaction.jitter",
0.20F);
if (jitterPct > 0) {
long jitter = Math.round(ret * jitterPct);
// deterministic jitter avoids a major compaction storm on restart
ImmutableList<StoreFile> snapshot = storefiles;
if (snapshot != null && !snapshot.isEmpty()) {
String seed = snapshot.get(0).getPath().getName();
double curRand = new Random(seed.hashCode()).nextDouble();
ret += jitter - Math.round(2L * jitter * curRand);
} else {
ret = 0; // no storefiles == no major compaction
}
}
}
return ret;
} }
public CompactionRequest requestCompaction() throws IOException { public CompactionRequest requestCompaction() throws IOException {
@ -1158,11 +1263,9 @@ public class HStore implements Store, StoreConfiguration {
CompactSelection filesToCompact; CompactSelection filesToCompact;
if (override) { if (override) {
// coprocessor is overriding normal file selection // coprocessor is overriding normal file selection
filesToCompact = new CompactSelection(candidates); filesToCompact = new CompactSelection(conf, candidates);
} else { } else {
boolean isUserCompaction = priority == Store.PRIORITY_USER; filesToCompact = compactSelection(candidates, priority);
filesToCompact = compactionPolicy.selectCompaction(candidates, isUserCompaction,
forceMajor && filesCompacting.isEmpty());
} }
if (region.getCoprocessorHost() != null) { if (region.getCoprocessorHost() != null) {
@ -1185,17 +1288,12 @@ public class HStore implements Store, StoreConfiguration {
Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID); Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID);
// major compaction iff all StoreFiles are included // major compaction iff all StoreFiles are included
boolean isMajor = boolean isMajor = (filesToCompact.getFilesToCompact().size() == this.storefiles.size());
(filesToCompact.getFilesToCompact().size() == this.storefiles.size());
if (isMajor) { if (isMajor) {
// since we're enqueuing a major, update the compaction wait interval // since we're enqueuing a major, update the compaction wait interval
this.forceMajor = false; this.forceMajor = false;
} }
LOG.debug(getHRegion().regionInfo.getEncodedName() + " - " +
getColumnFamilyName() + ": Initiating " +
(isMajor ? "major" : "minor") + " compaction");
// everything went better than expected. create a compaction request // everything went better than expected. create a compaction request
int pri = getCompactPriority(priority); int pri = getCompactPriority(priority);
ret = new CompactionRequest(region, this, filesToCompact, isMajor, pri); ret = new CompactionRequest(region, this, filesToCompact, isMajor, pri);
@ -1217,6 +1315,191 @@ public class HStore implements Store, StoreConfiguration {
} }
} }
/**
* Algorithm to choose which files to compact, see {@link #compactSelection(java.util.List, int)}
* @param candidates
* @return
* @throws IOException
*/
CompactSelection compactSelection(List<StoreFile> candidates) throws IOException {
return compactSelection(candidates,Store.NO_PRIORITY);
}
/**
* Algorithm to choose which files to compact
*
* Configuration knobs:
* "hbase.hstore.compaction.ratio"
* normal case: minor compact when file <= sum(smaller_files) * ratio
* "hbase.hstore.compaction.min.size"
* unconditionally compact individual files below this size
* "hbase.hstore.compaction.max.size"
* never compact individual files above this size (unless splitting)
* "hbase.hstore.compaction.min"
* min files needed to minor compact
* "hbase.hstore.compaction.max"
* max files to compact at once (avoids OOM)
*
* @param candidates candidate files, ordered from oldest to newest
* @return subset copy of candidate list that meets compaction criteria
* @throws IOException
*/
CompactSelection compactSelection(List<StoreFile> candidates, int priority)
throws IOException {
// ASSUMPTION!!! filesCompacting is locked when calling this function
/* normal skew:
*
* older ----> newer
* _
* | | _
* | | | | _
* --|-|- |-|- |-|---_-------_------- minCompactSize
* | | | | | | | | _ | |
* | | | | | | | | | | | |
* | | | | | | | | | | | |
*/
CompactSelection compactSelection = new CompactSelection(conf, candidates);
boolean forcemajor = this.forceMajor && filesCompacting.isEmpty();
if (!forcemajor) {
// Delete the expired store files before the compaction selection.
if (conf.getBoolean("hbase.store.delete.expired.storefile", true)
&& (ttl != Long.MAX_VALUE) && (this.scanInfo.minVersions == 0)) {
CompactSelection expiredSelection = compactSelection
.selectExpiredStoreFilesToCompact(
EnvironmentEdgeManager.currentTimeMillis() - this.ttl);
// If there is any expired store files, delete them by compaction.
if (expiredSelection != null) {
return expiredSelection;
}
}
// do not compact old files above a configurable threshold
// save all references. we MUST compact them
int pos = 0;
while (pos < compactSelection.getFilesToCompact().size() &&
compactSelection.getFilesToCompact().get(pos).getReader().length()
> maxCompactSize &&
!compactSelection.getFilesToCompact().get(pos).isReference()) ++pos;
if (pos != 0) compactSelection.clearSubList(0, pos);
}
if (compactSelection.getFilesToCompact().isEmpty()) {
LOG.debug(this.getHRegionInfo().getEncodedName() + " - " +
this + ": no store files to compact");
compactSelection.emptyFileList();
return compactSelection;
}
// Force a major compaction if this is a user-requested major compaction,
// or if we do not have too many files to compact and this was requested
// as a major compaction
boolean majorcompaction = (forcemajor && priority == Store.PRIORITY_USER) ||
(forcemajor || isMajorCompaction(compactSelection.getFilesToCompact())) &&
(compactSelection.getFilesToCompact().size() < this.maxFilesToCompact
);
LOG.debug(this.getHRegionInfo().getEncodedName() + " - "
+ this.getColumnFamilyName() + ": Initiating " +
(majorcompaction ? "major" : "minor") + "compaction");
if (!majorcompaction &&
!hasReferences(compactSelection.getFilesToCompact())) {
// we're doing a minor compaction, let's see what files are applicable
int start = 0;
double r = compactSelection.getCompactSelectionRatio();
// remove bulk import files that request to be excluded from minors
compactSelection.getFilesToCompact().removeAll(Collections2.filter(
compactSelection.getFilesToCompact(),
new Predicate<StoreFile>() {
public boolean apply(StoreFile input) {
return input.excludeFromMinorCompaction();
}
}));
// skip selection algorithm if we don't have enough files
if (compactSelection.getFilesToCompact().size() < this.minFilesToCompact) {
if(LOG.isDebugEnabled()) {
LOG.debug("Not compacting files because we only have " +
compactSelection.getFilesToCompact().size() +
" files ready for compaction. Need " + this.minFilesToCompact + " to initiate.");
}
compactSelection.emptyFileList();
return compactSelection;
}
/* TODO: add sorting + unit test back in when HBASE-2856 is fixed
// Sort files by size to correct when normal skew is altered by bulk load.
Collections.sort(filesToCompact, StoreFile.Comparators.FILE_SIZE);
*/
// get store file sizes for incremental compacting selection.
int countOfFiles = compactSelection.getFilesToCompact().size();
long [] fileSizes = new long[countOfFiles];
long [] sumSize = new long[countOfFiles];
for (int i = countOfFiles-1; i >= 0; --i) {
StoreFile file = compactSelection.getFilesToCompact().get(i);
fileSizes[i] = file.getReader().length();
// calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
int tooFar = i + this.maxFilesToCompact - 1;
sumSize[i] = fileSizes[i]
+ ((i+1 < countOfFiles) ? sumSize[i+1] : 0)
- ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
}
/* Start at the oldest file and stop when you find the first file that
* meets compaction criteria:
* (1) a recently-flushed, small file (i.e. <= minCompactSize)
* OR
* (2) within the compactRatio of sum(newer_files)
* Given normal skew, any newer files will also meet this criteria
*
* Additional Note:
* If fileSizes.size() >> maxFilesToCompact, we will recurse on
* compact(). Consider the oldest files first to avoid a
* situation where we always compact [end-threshold,end). Then, the
* last file becomes an aggregate of the previous compactions.
*/
while(countOfFiles - start >= this.minFilesToCompact &&
fileSizes[start] >
Math.max(minCompactSize, (long)(sumSize[start+1] * r))) {
++start;
}
int end = Math.min(countOfFiles, start + this.maxFilesToCompact);
long totalSize = fileSizes[start]
+ ((start+1 < countOfFiles) ? sumSize[start+1] : 0);
compactSelection = compactSelection.getSubList(start, end);
// if we don't have enough files to compact, just wait
if (compactSelection.getFilesToCompact().size() < this.minFilesToCompact) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipped compaction of " + this
+ ". Only " + (end - start) + " file(s) of size "
+ StringUtils.humanReadableInt(totalSize)
+ " have met compaction criteria.");
}
compactSelection.emptyFileList();
return compactSelection;
}
} else {
if(majorcompaction) {
if (compactSelection.getFilesToCompact().size() > this.maxFilesToCompact) {
LOG.debug("Warning, compacting more than " + this.maxFilesToCompact +
" files, probably because of a user-requested major compaction");
if(priority != Store.PRIORITY_USER) {
LOG.error("Compacting more than max files on a non user-requested compaction");
}
}
} else if (compactSelection.getFilesToCompact().size() > this.maxFilesToCompact) {
// all files included in this compaction, up to max
int pastMax = compactSelection.getFilesToCompact().size() - this.maxFilesToCompact;
compactSelection.getFilesToCompact().subList(0, pastMax).clear();
}
}
return compactSelection;
}
/** /**
* Validates a store file by opening and closing it. In HFileV2 this should * Validates a store file by opening and closing it. In HFileV2 this should
* not be an expensive operation. * not be an expensive operation.
@ -1314,8 +1597,8 @@ public class HStore implements Store, StoreConfiguration {
// let the archive util decide if we should archive or delete the files // let the archive util decide if we should archive or delete the files
LOG.debug("Removing store files after compaction..."); LOG.debug("Removing store files after compaction...");
HFileArchiver.archiveStoreFiles(this.fs, this.region, this.conf, HFileArchiver.archiveStoreFiles(this.fs, this.region, this.conf, this.family.getName(),
this.family.getName(), compactedFiles); compactedFiles);
} catch (IOException e) { } catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e); e = RemoteExceptionHandler.checkIOException(e);
@ -1722,7 +2005,11 @@ public class HStore implements Store, StoreConfiguration {
@Override @Override
public boolean throttleCompaction(long compactionSize) { public boolean throttleCompaction(long compactionSize) {
return compactionPolicy.throttleCompaction(compactionSize); // see HBASE-5867 for discussion on the default
long throttlePoint = conf.getLong(
"hbase.regionserver.thread.compaction.throttle",
2 * this.minFilesToCompact * this.region.memstoreFlushSize);
return compactionSize > throttlePoint;
} }
@Override @Override
@ -1828,7 +2115,7 @@ public class HStore implements Store, StoreConfiguration {
@Override @Override
public boolean needsCompaction() { public boolean needsCompaction() {
return compactionPolicy.needsCompaction(storefiles.size() - filesCompacting.size()); return (storefiles.size() - filesCompacting.size()) > minFilesToCompact;
} }
@Override @Override
@ -1837,8 +2124,8 @@ public class HStore implements Store, StoreConfiguration {
} }
public static final long FIXED_OVERHEAD = public static final long FIXED_OVERHEAD =
ClassSize.align((20 * ClassSize.REFERENCE) + (4 * Bytes.SIZEOF_LONG) ClassSize.align((19 * ClassSize.REFERENCE) + (6 * Bytes.SIZEOF_LONG)
+ (3 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN); + (5 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
+ ClassSize.OBJECT + ClassSize.REENTRANT_LOCK + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK

View File

@ -1,50 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* The class that contains shared information about various knobs of a Store/HStore object.
* Unlike the configuration objects that merely return the XML values, the implementations
* should return ready-to-use applicable values for corresponding calls, after all the
* parsing/validation/adjustment for other considerations, so that we don't have to repeat
* this logic in multiple places.
* TODO: move methods and logic here as necessary.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public interface StoreConfiguration {
/**
* Gets the cf-specific major compaction period.
*/
public Long getMajorCompactionPeriod();
/**
* Gets the Memstore flush size for the region that this store works with.
*/
public long getMemstoreFlushSize();
/**
* Gets the cf-specific time-to-live for store files.
*/
public long getStoreFileTtl();
}

View File

@ -65,7 +65,6 @@ import org.apache.hadoop.hbase.util.BloomFilter;
import org.apache.hadoop.hbase.util.BloomFilterFactory; import org.apache.hadoop.hbase.util.BloomFilterFactory;
import org.apache.hadoop.hbase.util.BloomFilterWriter; import org.apache.hadoop.hbase.util.BloomFilterWriter;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.RawComparator;
@ -296,7 +295,7 @@ public class StoreFile {
* @return True if this is a StoreFile Reference; call after {@link #open()} * @return True if this is a StoreFile Reference; call after {@link #open()}
* else may get wrong answer. * else may get wrong answer.
*/ */
public boolean isReference() { boolean isReference() {
return this.reference != null; return this.reference != null;
} }
@ -358,7 +357,7 @@ public class StoreFile {
/** /**
* @return True if this file was made by a major compaction. * @return True if this file was made by a major compaction.
*/ */
public boolean isMajorCompaction() { boolean isMajorCompaction() {
if (this.majorCompaction == null) { if (this.majorCompaction == null) {
throw new NullPointerException("This has not been set yet"); throw new NullPointerException("This has not been set yet");
} }
@ -368,7 +367,7 @@ public class StoreFile {
/** /**
* @return True if this file should not be part of a minor compaction. * @return True if this file should not be part of a minor compaction.
*/ */
public boolean excludeFromMinorCompaction() { boolean excludeFromMinorCompaction() {
return this.excludeFromMinorCompaction; return this.excludeFromMinorCompaction;
} }
@ -580,6 +579,7 @@ public class StoreFile {
} }
} }
} }
this.reader.setSequenceID(this.sequenceid); this.reader.setSequenceID(this.sequenceid);
b = metadataMap.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY); b = metadataMap.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY);
@ -945,11 +945,6 @@ public class StoreFile {
return r.write(fs, p); return r.write(fs, p);
} }
public Long getMinimumTimestamp() {
return (getReader().timeRangeTracker == null) ?
null :
getReader().timeRangeTracker.minimumTimestamp;
}
/** /**
* A StoreFile writer. Use this to read/write HBase Store Files. It is package * A StoreFile writer. Use this to read/write HBase Store Files. It is package

View File

@ -1,64 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
/**
* Utility functions for region server storage layer.
*/
public class StoreUtils {
/**
* Creates a deterministic hash code for store file collection.
*/
public static Integer getDeterministicRandomSeed(final List<StoreFile> files) {
if (files != null && !files.isEmpty()) {
return files.get(0).getPath().getName().hashCode();
}
return null;
}
/**
* Determines whether any files in the collection are references.
*/
public static boolean hasReferences(final Collection<StoreFile> files) {
if (files != null && files.size() > 0) {
for (StoreFile hsf: files) {
if (hsf.isReference()) {
return true;
}
}
}
return false;
}
/**
* Gets lowest timestamp from candidate StoreFiles
*/
public static long getLowestTimestamp(final List<StoreFile> candidates)
throws IOException {
long minTs = Long.MAX_VALUE;
for (StoreFile storeFile : candidates) {
minTs = Math.min(minTs, storeFile.getModificationTimeStamp());
}
return minTs;
}
}

View File

@ -19,13 +19,15 @@
package org.apache.hadoop.hbase.regionserver.compactions; package org.apache.hadoop.hbase.regionserver.compactions;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Calendar;
import java.util.GregorianCalendar;
import java.util.List; import java.util.List;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@InterfaceAudience.Private @InterfaceAudience.Private
public class CompactSelection { public class CompactSelection {
@ -46,15 +48,37 @@ public class CompactSelection {
*/ */
private final static Object compactionCountLock = new Object(); private final static Object compactionCountLock = new Object();
// HBase conf object
Configuration conf;
// was this compaction promoted to an off-peak // was this compaction promoted to an off-peak
boolean isOffPeakCompaction = false; boolean isOffPeakCompaction = false;
// CompactSelection object creation time. // compactRatio: double on purpose! Float.MAX < Long.MAX < Double.MAX
private final long selectionTime; // With float, java will downcast your long to float for comparisons (bad)
private double compactRatio;
// compaction ratio off-peak
private double compactRatioOffPeak;
// offpeak start time
private int offPeakStartHour = -1;
// off peak end time
private int offPeakEndHour = -1;
public CompactSelection(List<StoreFile> filesToCompact) { public CompactSelection(Configuration conf, List<StoreFile> filesToCompact) {
this.selectionTime = EnvironmentEdgeManager.currentTimeMillis();
this.filesToCompact = filesToCompact; this.filesToCompact = filesToCompact;
this.isOffPeakCompaction = false; this.conf = conf;
this.compactRatio = conf.getFloat("hbase.hstore.compaction.ratio", 1.2F);
this.compactRatioOffPeak = conf.getFloat("hbase.hstore.compaction.ratio.offpeak", 5.0F);
// Peak time is from [offPeakStartHour, offPeakEndHour). Valid numbers are [0, 23]
this.offPeakStartHour = conf.getInt("hbase.offpeak.start.hour", -1);
this.offPeakEndHour = conf.getInt("hbase.offpeak.end.hour", -1);
if (!isValidHour(this.offPeakStartHour) || !isValidHour(this.offPeakEndHour)) {
if (!(this.offPeakStartHour == -1 && this.offPeakEndHour == -1)) {
LOG.warn("Invalid start/end hour for peak hour : start = " +
this.offPeakStartHour + " end = " + this.offPeakEndHour +
". Valid numbers are [0-23]");
}
this.offPeakStartHour = this.offPeakEndHour = -1;
}
} }
/** /**
@ -89,25 +113,49 @@ public class CompactSelection {
} }
if (hasExpiredStoreFiles) { if (hasExpiredStoreFiles) {
expiredSFSelection = new CompactSelection(expiredStoreFiles); expiredSFSelection = new CompactSelection(conf, expiredStoreFiles);
} }
return expiredSFSelection; return expiredSFSelection;
} }
/**
* If the current hour falls in the off peak times and there are no
* outstanding off peak compactions, the current compaction is
* promoted to an off peak compaction. Currently only one off peak
* compaction is present in the compaction queue.
*
* @param currentHour
* @return
*/
public double getCompactSelectionRatio() {
double r = this.compactRatio;
synchronized(compactionCountLock) {
if (isOffPeakHour() && numOutstandingOffPeakCompactions == 0) {
r = this.compactRatioOffPeak;
numOutstandingOffPeakCompactions++;
isOffPeakCompaction = true;
}
}
if(isOffPeakCompaction) {
LOG.info("Running an off-peak compaction, selection ratio = " +
compactRatioOffPeak + ", numOutstandingOffPeakCompactions is now " +
numOutstandingOffPeakCompactions);
}
return r;
}
/** /**
* The current compaction finished, so reset the off peak compactions count * The current compaction finished, so reset the off peak compactions count
* if this was an off peak compaction. * if this was an off peak compaction.
*/ */
public void finishRequest() { public void finishRequest() {
if (isOffPeakCompaction) { if (isOffPeakCompaction) {
long newValueToLog = -1;
synchronized(compactionCountLock) { synchronized(compactionCountLock) {
assert !isOffPeakCompaction : "Double-counting off-peak count for compaction"; numOutstandingOffPeakCompactions--;
newValueToLog = --numOutstandingOffPeakCompactions;
isOffPeakCompaction = false; isOffPeakCompaction = false;
} }
LOG.info("Compaction done, numOutstandingOffPeakCompactions is now " + LOG.info("Compaction done, numOutstandingOffPeakCompactions is now " +
newValueToLog); numOutstandingOffPeakCompactions);
} }
} }
@ -122,14 +170,13 @@ public class CompactSelection {
public void emptyFileList() { public void emptyFileList() {
filesToCompact.clear(); filesToCompact.clear();
if (isOffPeakCompaction) { if (isOffPeakCompaction) {
long newValueToLog = -1;
synchronized(compactionCountLock) { synchronized(compactionCountLock) {
// reset the off peak count // reset the off peak count
newValueToLog = --numOutstandingOffPeakCompactions; numOutstandingOffPeakCompactions--;
isOffPeakCompaction = false; isOffPeakCompaction = false;
} }
LOG.info("Nothing to compact, numOutstandingOffPeakCompactions is now " + LOG.info("Nothing to compact, numOutstandingOffPeakCompactions is now " +
newValueToLog); numOutstandingOffPeakCompactions);
} }
} }
@ -137,30 +184,16 @@ public class CompactSelection {
return this.isOffPeakCompaction; return this.isOffPeakCompaction;
} }
public static long getNumOutStandingOffPeakCompactions() { private boolean isOffPeakHour() {
synchronized(compactionCountLock) { int currentHour = (new GregorianCalendar()).get(Calendar.HOUR_OF_DAY);
return numOutstandingOffPeakCompactions; // If offpeak time checking is disabled just return false.
if (this.offPeakStartHour == this.offPeakEndHour) {
return false;
} }
} if (this.offPeakStartHour < this.offPeakEndHour) {
return (currentHour >= this.offPeakStartHour && currentHour < this.offPeakEndHour);
/**
* Tries making the compaction off-peak.
* Only checks internal compaction constraints, not timing.
* @return Eventual value of isOffPeakCompaction.
*/
public boolean trySetOffpeak() {
assert !isOffPeakCompaction : "Double-setting off-peak for compaction " + this;
synchronized(compactionCountLock) {
if (numOutstandingOffPeakCompactions == 0) {
numOutstandingOffPeakCompactions++;
isOffPeakCompaction = true;
}
} }
return isOffPeakCompaction; return (currentHour >= this.offPeakStartHour || currentHour < this.offPeakEndHour);
}
public long getSelectionTime() {
return selectionTime;
} }
public CompactSelection subList(int start, int end) { public CompactSelection subList(int start, int end) {

View File

@ -1,214 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.compactions;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.regionserver.StoreConfiguration;
/**
* Compaction configuration for a particular instance of HStore.
* Takes into account both global settings and ones set on the column family/store.
* Control knobs for default compaction algorithm:
* <p/>
* maxCompactSize - upper bound on file size to be included in minor compactions
* minCompactSize - lower bound below which compaction is selected without ratio test
* minFilesToCompact - lower bound on number of files in any minor compaction
* maxFilesToCompact - upper bound on number of files in any minor compaction
* compactionRatio - Ratio used for compaction
* <p/>
* Set parameter as "hbase.hstore.compaction.<attribute>"
*/
//TODO: revisit this class for online parameter updating (both in xml and on the CF)
@InterfaceAudience.Private
public class CompactionConfiguration {
static final Log LOG = LogFactory.getLog(CompactionConfiguration.class);
private static final String CONFIG_PREFIX = "hbase.hstore.compaction.";
Configuration conf;
StoreConfiguration storeConfig;
long maxCompactSize;
long minCompactSize;
int minFilesToCompact;
int maxFilesToCompact;
double compactionRatio;
double offPeekCompactionRatio;
int offPeakStartHour;
int offPeakEndHour;
long throttlePoint;
boolean shouldDeleteExpired;
long majorCompactionPeriod;
float majorCompactionJitter;
CompactionConfiguration(Configuration conf, StoreConfiguration storeConfig) {
this.conf = conf;
this.storeConfig = storeConfig;
maxCompactSize = conf.getLong(CONFIG_PREFIX + "max.size", Long.MAX_VALUE);
minCompactSize = conf.getLong(CONFIG_PREFIX + "min.size",
storeConfig.getMemstoreFlushSize());
minFilesToCompact = Math.max(2, conf.getInt(CONFIG_PREFIX + "min",
/*old name*/ conf.getInt("hbase.hstore.compactionThreshold", 3)));
maxFilesToCompact = conf.getInt(CONFIG_PREFIX + "max", 10);
compactionRatio = conf.getFloat(CONFIG_PREFIX + "ratio", 1.2F);
offPeekCompactionRatio = conf.getFloat(CONFIG_PREFIX + "ratio.offpeak", 5.0F);
offPeakStartHour = conf.getInt("hbase.offpeak.start.hour", -1);
offPeakEndHour = conf.getInt("hbase.offpeak.end.hour", -1);
if (!isValidHour(offPeakStartHour) || !isValidHour(offPeakEndHour)) {
if (!(offPeakStartHour == -1 && offPeakEndHour == -1)) {
LOG.warn("Ignoring invalid start/end hour for peak hour : start = " +
this.offPeakStartHour + " end = " + this.offPeakEndHour +
". Valid numbers are [0-23]");
}
this.offPeakStartHour = this.offPeakEndHour = -1;
}
throttlePoint = conf.getLong("hbase.regionserver.thread.compaction.throttle",
2 * maxFilesToCompact * storeConfig.getMemstoreFlushSize());
shouldDeleteExpired = conf.getBoolean("hbase.store.delete.expired.storefile", true);
majorCompactionPeriod = conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24);
majorCompactionJitter = conf.getFloat("hbase.hregion.majorcompaction.jitter", 0.20F);
LOG.info("Compaction configuration " + this.toString());
}
@Override
public String toString() {
return String.format(
"size [%d, %d); files [%d, %d); ratio %f; off-peak ratio %f; off-peak hours %d-%d; "
+ "throttle point %d;%s delete expired; major period %d, major jitter %f",
minCompactSize,
maxCompactSize,
minFilesToCompact,
maxFilesToCompact,
compactionRatio,
offPeekCompactionRatio,
offPeakStartHour,
offPeakEndHour,
throttlePoint,
shouldDeleteExpired ? "" : " don't",
majorCompactionPeriod,
majorCompactionJitter);
}
/**
* @return lower bound below which compaction is selected without ratio test
*/
long getMinCompactSize() {
return minCompactSize;
}
/**
* @return upper bound on file size to be included in minor compactions
*/
long getMaxCompactSize() {
return maxCompactSize;
}
/**
* @return upper bound on number of files to be included in minor compactions
*/
int getMinFilesToCompact() {
return minFilesToCompact;
}
/**
* @return upper bound on number of files to be included in minor compactions
*/
int getMaxFilesToCompact() {
return maxFilesToCompact;
}
/**
* @return Ratio used for compaction
*/
double getCompactionRatio() {
return compactionRatio;
}
/**
* @return Off peak Ratio used for compaction
*/
double getCompactionRatioOffPeak() {
return offPeekCompactionRatio;
}
/**
* @return Hour at which off-peak compactions start
*/
int getOffPeakStartHour() {
return offPeakStartHour;
}
/**
* @return Hour at which off-peak compactions end
*/
int getOffPeakEndHour() {
return offPeakEndHour;
}
/**
* @return ThrottlePoint used for classifying small and large compactions
*/
long getThrottlePoint() {
return throttlePoint;
}
/**
* @return Major compaction period from compaction.
* Major compactions are selected periodically according to this parameter plus jitter
*/
long getMajorCompactionPeriod() {
if (storeConfig != null) {
Long storeSpecificPeriod = storeConfig.getMajorCompactionPeriod();
if (storeSpecificPeriod != null) {
return storeSpecificPeriod;
}
}
return majorCompactionPeriod;
}
/**
* @return Major the jitter fraction, the fraction within which the major compaction
* period is randomly chosen from the majorCompactionPeriod in each store.
*/
float getMajorCompactionJitter() {
return majorCompactionJitter;
}
/**
* @return Whether expired files should be deleted ASAP using compactions
*/
boolean shouldDeleteExpired() {
return shouldDeleteExpired;
}
private static boolean isValidHour(int hour) {
return (hour >= 0 && hour <= 23);
}
}

View File

@ -1,409 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.compactions;
import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.regionserver.StoreConfiguration;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.util.StringUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.GregorianCalendar;
import java.util.List;
import java.util.Random;
/**
* The default (and only, as of now) algorithm for selecting files for compaction.
* Combines the compaction configuration and the provisional file selection that
* it's given to produce the list of suitable candidates for compaction.
*/
@InterfaceAudience.Private
public class CompactionPolicy {
private static final Log LOG = LogFactory.getLog(CompactionPolicy.class);
private final static Calendar calendar = new GregorianCalendar();
CompactionConfiguration comConf;
StoreConfiguration storeConfig;
public CompactionPolicy(Configuration configuration, StoreConfiguration storeConfig) {
updateConfiguration(configuration, storeConfig);
}
/**
* @param candidateFiles candidate files, ordered from oldest to newest
* @return subset copy of candidate list that meets compaction criteria
* @throws java.io.IOException
*/
public CompactSelection selectCompaction(List<StoreFile> candidateFiles,
boolean isUserCompaction, boolean forceMajor)
throws IOException {
// Prelimanry compaction subject to filters
CompactSelection candidateSelection = new CompactSelection(candidateFiles);
long cfTtl = this.storeConfig.getStoreFileTtl();
if (!forceMajor) {
// If there are expired files, only select them so that compaction deletes them
if (comConf.shouldDeleteExpired() && (cfTtl != Long.MAX_VALUE)) {
CompactSelection expiredSelection = selectExpiredStoreFiles(
candidateSelection, EnvironmentEdgeManager.currentTimeMillis() - cfTtl);
if (expiredSelection != null) {
return expiredSelection;
}
}
candidateSelection = skipLargeFiles(candidateSelection);
}
// Force a major compaction if this is a user-requested major compaction,
// or if we do not have too many files to compact and this was requested
// as a major compaction.
// Or, if there are any references among the candidates.
boolean majorCompaction = (
(forceMajor && isUserCompaction)
|| ((forceMajor || isMajorCompaction(candidateSelection.getFilesToCompact()))
&& (candidateSelection.getFilesToCompact().size() < comConf.getMaxFilesToCompact()))
|| StoreUtils.hasReferences(candidateSelection.getFilesToCompact())
);
if (!majorCompaction) {
// we're doing a minor compaction, let's see what files are applicable
candidateSelection = filterBulk(candidateSelection);
candidateSelection = applyCompactionPolicy(candidateSelection);
candidateSelection = checkMinFilesCriteria(candidateSelection);
}
candidateSelection =
removeExcessFiles(candidateSelection, isUserCompaction, majorCompaction);
return candidateSelection;
}
/**
* Updates the compaction configuration. Used for tests.
* TODO: replace when HBASE-3909 is completed in some form.
*/
public void updateConfiguration(Configuration configuration,
StoreConfiguration storeConfig) {
this.comConf = new CompactionConfiguration(configuration, storeConfig);
this.storeConfig = storeConfig;
}
/**
* Select the expired store files to compact
*
* @param candidates the initial set of storeFiles
* @param maxExpiredTimeStamp
* The store file will be marked as expired if its max time stamp is
* less than this maxExpiredTimeStamp.
* @return A CompactSelection contains the expired store files as
* filesToCompact
*/
private CompactSelection selectExpiredStoreFiles(
CompactSelection candidates, long maxExpiredTimeStamp) {
List<StoreFile> filesToCompact = candidates.getFilesToCompact();
if (filesToCompact == null || filesToCompact.size() == 0)
return null;
ArrayList<StoreFile> expiredStoreFiles = null;
boolean hasExpiredStoreFiles = false;
CompactSelection expiredSFSelection = null;
for (StoreFile storeFile : filesToCompact) {
if (storeFile.getReader().getMaxTimestamp() < maxExpiredTimeStamp) {
LOG.info("Deleting the expired store file by compaction: "
+ storeFile.getPath() + " whose maxTimeStamp is "
+ storeFile.getReader().getMaxTimestamp()
+ " while the max expired timestamp is " + maxExpiredTimeStamp);
if (!hasExpiredStoreFiles) {
expiredStoreFiles = new ArrayList<StoreFile>();
hasExpiredStoreFiles = true;
}
expiredStoreFiles.add(storeFile);
}
}
if (hasExpiredStoreFiles) {
expiredSFSelection = new CompactSelection(expiredStoreFiles);
}
return expiredSFSelection;
}
/**
* @param candidates pre-filtrate
* @return filtered subset
* exclude all files above maxCompactSize
* Also save all references. We MUST compact them
*/
private CompactSelection skipLargeFiles(CompactSelection candidates) {
int pos = 0;
while (pos < candidates.getFilesToCompact().size() &&
candidates.getFilesToCompact().get(pos).getReader().length() >
comConf.getMaxCompactSize() &&
!candidates.getFilesToCompact().get(pos).isReference()) {
++pos;
}
if (pos > 0) {
LOG.debug("Some files are too large. Excluding " + pos
+ " files from compaction candidates");
candidates.clearSubList(0, pos);
}
return candidates;
}
/**
* @param candidates pre-filtrate
* @return filtered subset
* exclude all bulk load files if configured
*/
private CompactSelection filterBulk(CompactSelection candidates) {
candidates.getFilesToCompact().removeAll(Collections2.filter(
candidates.getFilesToCompact(),
new Predicate<StoreFile>() {
@Override
public boolean apply(StoreFile input) {
return input.excludeFromMinorCompaction();
}
}));
return candidates;
}
/**
* @param candidates pre-filtrate
* @return filtered subset
* take upto maxFilesToCompact from the start
*/
private CompactSelection removeExcessFiles(CompactSelection candidates,
boolean isUserCompaction, boolean isMajorCompaction) {
int excess = candidates.getFilesToCompact().size() - comConf.getMaxFilesToCompact();
if (excess > 0) {
if (isMajorCompaction && isUserCompaction) {
LOG.debug("Warning, compacting more than " + comConf.getMaxFilesToCompact() +
" files because of a user-requested major compaction");
} else {
LOG.debug("Too many admissible files. Excluding " + excess
+ " files from compaction candidates");
candidates.clearSubList(comConf.getMaxFilesToCompact(),
candidates.getFilesToCompact().size());
}
}
return candidates;
}
/**
* @param candidates pre-filtrate
* @return filtered subset
* forget the compactionSelection if we don't have enough files
*/
private CompactSelection checkMinFilesCriteria(CompactSelection candidates) {
int minFiles = comConf.getMinFilesToCompact();
if (candidates.getFilesToCompact().size() < minFiles) {
if(LOG.isDebugEnabled()) {
LOG.debug("Not compacting files because we only have " +
candidates.getFilesToCompact().size() +
" files ready for compaction. Need " + minFiles + " to initiate.");
}
candidates.emptyFileList();
}
return candidates;
}
/**
* @param candidates pre-filtrate
* @return filtered subset
* -- Default minor compaction selection algorithm:
* choose CompactSelection from candidates --
* First exclude bulk-load files if indicated in configuration.
* Start at the oldest file and stop when you find the first file that
* meets compaction criteria:
* (1) a recently-flushed, small file (i.e. <= minCompactSize)
* OR
* (2) within the compactRatio of sum(newer_files)
* Given normal skew, any newer files will also meet this criteria
* <p/>
* Additional Note:
* If fileSizes.size() >> maxFilesToCompact, we will recurse on
* compact(). Consider the oldest files first to avoid a
* situation where we always compact [end-threshold,end). Then, the
* last file becomes an aggregate of the previous compactions.
*
* normal skew:
*
* older ----> newer (increasing seqID)
* _
* | | _
* | | | | _
* --|-|- |-|- |-|---_-------_------- minCompactSize
* | | | | | | | | _ | |
* | | | | | | | | | | | |
* | | | | | | | | | | | |
*/
CompactSelection applyCompactionPolicy(CompactSelection candidates) throws IOException {
if (candidates.getFilesToCompact().isEmpty()) {
return candidates;
}
// we're doing a minor compaction, let's see what files are applicable
int start = 0;
double ratio = comConf.getCompactionRatio();
if (isOffPeakHour() && candidates.trySetOffpeak()) {
ratio = comConf.getCompactionRatioOffPeak();
LOG.info("Running an off-peak compaction, selection ratio = " + ratio
+ ", numOutstandingOffPeakCompactions is now "
+ CompactSelection.getNumOutStandingOffPeakCompactions());
}
// get store file sizes for incremental compacting selection.
int countOfFiles = candidates.getFilesToCompact().size();
long[] fileSizes = new long[countOfFiles];
long[] sumSize = new long[countOfFiles];
for (int i = countOfFiles - 1; i >= 0; --i) {
StoreFile file = candidates.getFilesToCompact().get(i);
fileSizes[i] = file.getReader().length();
// calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
int tooFar = i + comConf.getMaxFilesToCompact() - 1;
sumSize[i] = fileSizes[i]
+ ((i + 1 < countOfFiles) ? sumSize[i + 1] : 0)
- ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
}
while (countOfFiles - start >= comConf.getMinFilesToCompact() &&
fileSizes[start] > Math.max(comConf.getMinCompactSize(),
(long) (sumSize[start + 1] * ratio))) {
++start;
}
if (start < countOfFiles) {
LOG.info("Default compaction algorithm has selected " + (countOfFiles - start)
+ " files from " + countOfFiles + " candidates");
}
candidates = candidates.getSubList(start, countOfFiles);
return candidates;
}
/*
* @param filesToCompact Files to compact. Can be null.
* @return True if we should run a major compaction.
*/
public boolean isMajorCompaction(final List<StoreFile> filesToCompact)
throws IOException {
boolean result = false;
long mcTime = getNextMajorCompactTime(filesToCompact);
if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) {
return result;
}
// TODO: Use better method for determining stamp of last major (HBASE-2990)
long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact);
long now = System.currentTimeMillis();
if (lowTimestamp > 0l && lowTimestamp < (now - mcTime)) {
// Major compaction time has elapsed.
long cfTtl = this.storeConfig.getStoreFileTtl();
if (filesToCompact.size() == 1) {
// Single file
StoreFile sf = filesToCompact.get(0);
Long minTimestamp = sf.getMinimumTimestamp();
long oldest = (minTimestamp == null)
? Long.MIN_VALUE
: now - minTimestamp.longValue();
if (sf.isMajorCompaction() &&
(cfTtl == HConstants.FOREVER || oldest < cfTtl)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping major compaction of " + this +
" because one (major) compacted file only and oldestTime " +
oldest + "ms is < ttl=" + cfTtl);
}
} else if (cfTtl != HConstants.FOREVER && oldest > cfTtl) {
LOG.debug("Major compaction triggered on store " + this +
", because keyvalues outdated; time since last major compaction " +
(now - lowTimestamp) + "ms");
result = true;
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Major compaction triggered on store " + this +
"; time since last major compaction " + (now - lowTimestamp) + "ms");
}
result = true;
}
}
return result;
}
public long getNextMajorCompactTime(final List<StoreFile> filesToCompact) {
// default = 24hrs
long ret = comConf.getMajorCompactionPeriod();
if (ret > 0) {
// default = 20% = +/- 4.8 hrs
double jitterPct = comConf.getMajorCompactionJitter();
if (jitterPct > 0) {
long jitter = Math.round(ret * jitterPct);
// deterministic jitter avoids a major compaction storm on restart
Integer seed = StoreUtils.getDeterministicRandomSeed(filesToCompact);
if (seed != null) {
double rnd = (new Random(seed)).nextDouble();
ret += jitter - Math.round(2L * jitter * rnd);
} else {
ret = 0; // no storefiles == no major compaction
}
}
}
return ret;
}
/**
* @param compactionSize Total size of some compaction
* @return whether this should be a large or small compaction
*/
public boolean throttleCompaction(long compactionSize) {
return compactionSize > comConf.getThrottlePoint();
}
/**
* @param numCandidates Number of candidate store files
* @return whether a compactionSelection is possible
*/
public boolean needsCompaction(int numCandidates) {
return numCandidates > comConf.getMinFilesToCompact();
}
/**
* @return whether this is off-peak hour
*/
private boolean isOffPeakHour() {
int currentHour = calendar.get(Calendar.HOUR_OF_DAY);
int startHour = comConf.getOffPeakStartHour();
int endHour = comConf.getOffPeakEndHour();
// If offpeak time checking is disabled just return false.
if (startHour == endHour) {
return false;
}
if (startHour < endHour) {
return (currentHour >= startHour && currentHour < endHour);
}
return (currentHour >= startHour || currentHour < endHour);
}
}

View File

@ -208,10 +208,6 @@ public class CompactionRequest implements Comparable<CompactionRequest>,
return p; return p;
} }
public long getSelectionTime() {
return compactSelection.getSelectionTime();
}
/** Gets the priority for the request */ /** Gets the priority for the request */
public void setPriority(int p) { public void setPriority(int p) {
this.p = p; this.p = p;
@ -275,7 +271,7 @@ public class CompactionRequest implements Comparable<CompactionRequest>,
server.checkFileSystem(); server.checkFileSystem();
} finally { } finally {
s.finishRequest(this); s.finishRequest(this);
LOG.debug("CompactSplitThread Status: " + server.compactSplitThread); LOG.debug("CompactSplitThread status: " + server.compactSplitThread);
} }
} }

View File

@ -49,7 +49,8 @@ import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl; import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.regionserver.compactions.*; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
@ -301,7 +302,6 @@ public class TestCompaction extends HBaseTestCase {
conf.setFloat("hbase.hregion.majorcompaction.jitter", jitterPct); conf.setFloat("hbase.hregion.majorcompaction.jitter", jitterPct);
HStore s = ((HStore) r.getStore(COLUMN_FAMILY)); HStore s = ((HStore) r.getStore(COLUMN_FAMILY));
s.compactionPolicy.updateConfiguration(conf, s);
try { try {
createStoreFile(r); createStoreFile(r);
createStoreFile(r); createStoreFile(r);
@ -313,11 +313,9 @@ public class TestCompaction extends HBaseTestCase {
assertEquals(2, s.getStorefilesCount()); assertEquals(2, s.getStorefilesCount());
// ensure that major compaction time is deterministic // ensure that major compaction time is deterministic
CompactionPolicy c = s.compactionPolicy; long mcTime = s.getNextMajorCompactTime();
List<StoreFile> storeFiles = s.getStorefiles();
long mcTime = c.getNextMajorCompactTime(storeFiles);
for (int i = 0; i < 10; ++i) { for (int i = 0; i < 10; ++i) {
assertEquals(mcTime, c.getNextMajorCompactTime(storeFiles)); assertEquals(mcTime, s.getNextMajorCompactTime());
} }
// ensure that the major compaction time is within the variance // ensure that the major compaction time is within the variance

View File

@ -1,321 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.GregorianCalendar;
import java.util.List;
import junit.framework.TestCase;
import org.junit.experimental.categories.Category;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
import org.apache.hadoop.hbase.regionserver.compactions.*;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import com.google.common.collect.Lists;
@Category(SmallTests.class)
public class TestDefaultCompactSelection extends TestCase {
private final static Log LOG = LogFactory.getLog(TestDefaultCompactSelection.class);
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
protected Configuration conf;
protected HStore store;
private static final String DIR=
TEST_UTIL.getDataTestDir(TestDefaultCompactSelection.class.getSimpleName()).toString();
private static Path TEST_FILE;
private CompactionPolicy manager;
protected static final int minFiles = 3;
protected static final int maxFiles = 5;
protected static final long minSize = 10;
protected static final long maxSize = 1000;
@Override
public void setUp() throws Exception {
// setup config values necessary for store
this.conf = TEST_UTIL.getConfiguration();
this.conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 0);
this.conf.setInt("hbase.hstore.compaction.min", minFiles);
this.conf.setInt("hbase.hstore.compaction.max", maxFiles);
this.conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, minSize);
this.conf.setLong("hbase.hstore.compaction.max.size", maxSize);
this.conf.setFloat("hbase.hstore.compaction.ratio", 1.0F);
//Setting up a Store
Path basedir = new Path(DIR);
String logName = "logs";
Path logdir = new Path(DIR, logName);
Path oldLogDir = new Path(basedir, HConstants.HREGION_OLDLOGDIR_NAME);
HColumnDescriptor hcd = new HColumnDescriptor(Bytes.toBytes("family"));
FileSystem fs = FileSystem.get(conf);
fs.delete(logdir, true);
HTableDescriptor htd = new HTableDescriptor(Bytes.toBytes("table"));
htd.addFamily(hcd);
HRegionInfo info = new HRegionInfo(htd.getName(), null, null, false);
HLog hlog = HLogFactory.createHLog(fs, basedir,
logName, conf);
HRegion region = HRegion.createHRegion(info, basedir, conf, htd);
HRegion.closeHRegion(region);
Path tableDir = new Path(basedir, Bytes.toString(htd.getName()));
region = new HRegion(tableDir, hlog, fs, conf, info, htd, null);
store = new HStore(basedir, region, hcd, fs, conf);
manager = store.compactionPolicy;
TEST_FILE = StoreFile.getRandomFilename(fs, store.getHomedir());
fs.create(TEST_FILE);
}
// used so our tests don't deal with actual StoreFiles
static class MockStoreFile extends StoreFile {
long length = 0;
boolean isRef = false;
long ageInDisk;
long sequenceid;
MockStoreFile(long length, long ageInDisk, boolean isRef, long sequenceid) throws IOException {
super(TEST_UTIL.getTestFileSystem(), TEST_FILE, TEST_UTIL.getConfiguration(),
new CacheConfig(TEST_UTIL.getConfiguration()), BloomType.NONE,
NoOpDataBlockEncoder.INSTANCE);
this.length = length;
this.isRef = isRef;
this.ageInDisk = ageInDisk;
this.sequenceid = sequenceid;
}
void setLength(long newLen) {
this.length = newLen;
}
@Override
public long getMaxSequenceId() {
return sequenceid;
}
@Override
public boolean isMajorCompaction() {
return false;
}
@Override
public boolean isReference() {
return this.isRef;
}
@Override
public StoreFile.Reader getReader() {
final long len = this.length;
return new StoreFile.Reader() {
@Override
public long length() {
return len;
}
};
}
}
ArrayList<Long> toArrayList(long... numbers) {
ArrayList<Long> result = new ArrayList<Long>();
for (long i : numbers) {
result.add(i);
}
return result;
}
List<StoreFile> sfCreate(long... sizes) throws IOException {
ArrayList<Long> ageInDisk = new ArrayList<Long>();
for (int i = 0; i < sizes.length; i++) {
ageInDisk.add(0L);
}
return sfCreate(toArrayList(sizes), ageInDisk);
}
List<StoreFile> sfCreate(ArrayList<Long> sizes, ArrayList<Long> ageInDisk)
throws IOException {
return sfCreate(false, sizes, ageInDisk);
}
List<StoreFile> sfCreate(boolean isReference, long... sizes) throws IOException {
ArrayList<Long> ageInDisk = new ArrayList<Long>(sizes.length);
for (int i = 0; i < sizes.length; i++) {
ageInDisk.add(0L);
}
return sfCreate(isReference, toArrayList(sizes), ageInDisk);
}
List<StoreFile> sfCreate(boolean isReference, ArrayList<Long> sizes, ArrayList<Long> ageInDisk)
throws IOException {
List<StoreFile> ret = Lists.newArrayList();
for (int i = 0; i < sizes.size(); i++) {
ret.add(new MockStoreFile(sizes.get(i), ageInDisk.get(i), isReference, i));
}
return ret;
}
long[] getSizes(List<StoreFile> sfList) {
long[] aNums = new long[sfList.size()];
for (int i = 0; i < sfList.size(); ++i) {
aNums[i] = sfList.get(i).getReader().length();
}
return aNums;
}
void compactEquals(List<StoreFile> candidates, long... expected)
throws IOException {
compactEquals(candidates, false, expected);
}
void compactEquals(List<StoreFile> candidates, boolean forcemajor,
long ... expected)
throws IOException {
store.forceMajor = forcemajor;
//Test Default compactions
List<StoreFile> actual = store.compactionPolicy
.selectCompaction(candidates, false, forcemajor).getFilesToCompact();
assertEquals(Arrays.toString(expected), Arrays.toString(getSizes(actual)));
store.forceMajor = false;
}
public void testCompactionRatio() throws IOException {
/**
* NOTE: these tests are specific to describe the implementation of the
* current compaction algorithm. Developed to ensure that refactoring
* doesn't implicitly alter this.
*/
long tooBig = maxSize + 1;
// default case. preserve user ratio on size
compactEquals(sfCreate(100,50,23,12,12), 23, 12, 12);
// less than compact threshold = don't compact
compactEquals(sfCreate(100,50,25,12,12) /* empty */);
// greater than compact size = skip those
compactEquals(sfCreate(tooBig, tooBig, 700, 700, 700), 700, 700, 700);
// big size + threshold
compactEquals(sfCreate(tooBig, tooBig, 700,700) /* empty */);
// small files = don't care about ratio
compactEquals(sfCreate(8,3,1), 8,3,1);
/* TODO: add sorting + unit test back in when HBASE-2856 is fixed
// sort first so you don't include huge file the tail end.
// happens with HFileOutputFormat bulk migration
compactEquals(sfCreate(100,50,23,12,12, 500), 23, 12, 12);
*/
// don't exceed max file compact threshold
// note: file selection starts with largest to smallest.
compactEquals(sfCreate(7, 6, 5, 4, 3, 2, 1), 7, 6, 5, 4, 3);
/* MAJOR COMPACTION */
// if a major compaction has been forced, then compact everything
compactEquals(sfCreate(50,25,12,12), true, 50, 25, 12, 12);
// also choose files < threshold on major compaction
compactEquals(sfCreate(12,12), true, 12, 12);
// even if one of those files is too big
compactEquals(sfCreate(tooBig, 12,12), true, tooBig, 12, 12);
// don't exceed max file compact threshold, even with major compaction
store.forceMajor = true;
compactEquals(sfCreate(7, 6, 5, 4, 3, 2, 1), 7, 6, 5, 4, 3);
store.forceMajor = false;
// if we exceed maxCompactSize, downgrade to minor
// if not, it creates a 'snowball effect' when files >> maxCompactSize:
// the last file in compaction is the aggregate of all previous compactions
compactEquals(sfCreate(100,50,23,12,12), true, 23, 12, 12);
conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 1);
conf.setFloat("hbase.hregion.majorcompaction.jitter", 0);
store.compactionPolicy.updateConfiguration(conf, store);
try {
// trigger an aged major compaction
compactEquals(sfCreate(50,25,12,12), 50, 25, 12, 12);
// major sure exceeding maxCompactSize also downgrades aged minors
compactEquals(sfCreate(100,50,23,12,12), 23, 12, 12);
} finally {
conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24);
conf.setFloat("hbase.hregion.majorcompaction.jitter", 0.20F);
}
/* REFERENCES == file is from a region that was split */
// treat storefiles that have references like a major compaction
compactEquals(sfCreate(true, 100,50,25,12,12), 100, 50, 25, 12, 12);
// reference files shouldn't obey max threshold
compactEquals(sfCreate(true, tooBig, 12,12), tooBig, 12, 12);
// reference files should obey max file compact to avoid OOM
compactEquals(sfCreate(true, 7, 6, 5, 4, 3, 2, 1), 7, 6, 5, 4, 3);
// empty case
compactEquals(new ArrayList<StoreFile>() /* empty */);
// empty case (because all files are too big)
compactEquals(sfCreate(tooBig, tooBig) /* empty */);
}
public void testOffPeakCompactionRatio() throws IOException {
/*
* NOTE: these tests are specific to describe the implementation of the
* current compaction algorithm. Developed to ensure that refactoring
* doesn't implicitly alter this.
*/
long tooBig = maxSize + 1;
Calendar calendar = new GregorianCalendar();
int hourOfDay = calendar.get(Calendar.HOUR_OF_DAY);
LOG.debug("Hour of day = " + hourOfDay);
int hourPlusOne = ((hourOfDay+1)%24);
int hourMinusOne = ((hourOfDay-1+24)%24);
int hourMinusTwo = ((hourOfDay-2+24)%24);
// check compact selection without peak hour setting
LOG.debug("Testing compact selection without off-peak settings...");
compactEquals(sfCreate(999,50,12,12,1), 12, 12, 1);
// set an off-peak compaction threshold
this.conf.setFloat("hbase.hstore.compaction.ratio.offpeak", 5.0F);
// set peak hour to current time and check compact selection
this.conf.setLong("hbase.offpeak.start.hour", hourMinusOne);
this.conf.setLong("hbase.offpeak.end.hour", hourPlusOne);
LOG.debug("Testing compact selection with off-peak settings (" +
hourMinusOne + ", " + hourPlusOne + ")");
store.compactionPolicy.updateConfiguration(this.conf, store);
compactEquals(sfCreate(999, 50, 12, 12, 1), 50, 12, 12, 1);
// set peak hour outside current selection and check compact selection
this.conf.setLong("hbase.offpeak.start.hour", hourMinusTwo);
this.conf.setLong("hbase.offpeak.end.hour", hourMinusOne);
store.compactionPolicy.updateConfiguration(this.conf, store);
LOG.debug("Testing compact selection with off-peak settings (" +
hourMinusTwo + ", " + hourMinusOne + ")");
compactEquals(sfCreate(999,50,12,12, 1), 12, 12, 1);
}
}

View File

@ -248,15 +248,17 @@ public class TestStore extends TestCase {
flush(i); flush(i);
} }
// after flush; check the lowest time stamp // after flush; check the lowest time stamp
long lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles()); long lowestTimeStampFromStore =
long lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles()); HStore.getLowestTimestamp(store.getStorefiles());
assertEquals(lowestTimeStampFromManager,lowestTimeStampFromFS); long lowestTimeStampFromFS =
getLowestTimeStampFromFS(fs,store.getStorefiles());
assertEquals(lowestTimeStampFromStore,lowestTimeStampFromFS);
// after compact; check the lowest time stamp // after compact; check the lowest time stamp
store.compact(store.requestCompaction()); store.compact(store.requestCompaction());
lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles()); lowestTimeStampFromStore = HStore.getLowestTimestamp(store.getStorefiles());
lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles()); lowestTimeStampFromFS = getLowestTimeStampFromFS(fs,store.getStorefiles());
assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS); assertEquals(lowestTimeStampFromStore,lowestTimeStampFromFS);
} }
private static long getLowestTimeStampFromFS(FileSystem fs, private static long getLowestTimeStampFromFS(FileSystem fs,