HBASE-7110 refactor the compaction selection and config code similarly to 0.89-fb changes; REAPPLY v9
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1414308 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
47e3e0940b
commit
9806433ee9
|
@ -4172,7 +4172,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
}
|
||||
|
||||
/**
|
||||
* @return True if needs a mojor compaction.
|
||||
* @return True if needs a major compaction.
|
||||
* @throws IOException
|
||||
*/
|
||||
boolean isMajorCompaction() throws IOException {
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
|
@ -63,9 +64,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileScanner;
|
|||
import org.apache.hadoop.hbase.io.hfile.InvalidHFileException;
|
||||
import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
|
||||
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.*;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ChecksumType;
|
||||
import org.apache.hadoop.hbase.util.ClassSize;
|
||||
|
@ -75,8 +74,6 @@ import org.apache.hadoop.hbase.util.FSUtils;
|
|||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.collect.Collections2;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
|
@ -103,8 +100,9 @@ import com.google.common.collect.Lists;
|
|||
* <p>Locking and transactions are handled at a higher level. This API should
|
||||
* not be called directly but by an HRegion manager.
|
||||
*/
|
||||
//TODO: move StoreConfiguration implementation into a separate class.
|
||||
@InterfaceAudience.Private
|
||||
public class HStore implements Store {
|
||||
public class HStore implements Store, StoreConfiguration {
|
||||
static final Log LOG = LogFactory.getLog(HStore.class);
|
||||
|
||||
protected final MemStore memstore;
|
||||
|
@ -112,15 +110,12 @@ public class HStore implements Store {
|
|||
private final Path homedir;
|
||||
private final HRegion region;
|
||||
private final HColumnDescriptor family;
|
||||
CompactionPolicy compactionPolicy;
|
||||
final FileSystem fs;
|
||||
final Configuration conf;
|
||||
final CacheConfig cacheConf;
|
||||
// ttl in milliseconds.
|
||||
// ttl in milliseconds. TODO: can this be removed? Already stored in scanInfo.
|
||||
private long ttl;
|
||||
private final int minFilesToCompact;
|
||||
private final int maxFilesToCompact;
|
||||
private final long minCompactSize;
|
||||
private final long maxCompactSize;
|
||||
private long lastCompactSize = 0;
|
||||
volatile boolean forceMajor = false;
|
||||
/* how many bytes to write between status checks */
|
||||
|
@ -193,7 +188,7 @@ public class HStore implements Store {
|
|||
|
||||
this.comparator = info.getComparator();
|
||||
// Get TTL
|
||||
this.ttl = getTTL(family);
|
||||
this.ttl = determineTTLFromFamily(family);
|
||||
// used by ScanQueryMatcher
|
||||
long timeToPurgeDeletes =
|
||||
Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
|
||||
|
@ -204,23 +199,11 @@ public class HStore implements Store {
|
|||
scanInfo = new ScanInfo(family, ttl, timeToPurgeDeletes, this.comparator);
|
||||
this.memstore = new MemStore(conf, this.comparator);
|
||||
|
||||
// By default, compact if storefile.count >= minFilesToCompact
|
||||
this.minFilesToCompact = Math.max(2,
|
||||
conf.getInt("hbase.hstore.compaction.min",
|
||||
/*old name*/ conf.getInt("hbase.hstore.compactionThreshold", 3)));
|
||||
LOG.info("hbase.hstore.compaction.min = " + this.minFilesToCompact);
|
||||
|
||||
// Setting up cache configuration for this family
|
||||
this.cacheConf = new CacheConfig(conf, family);
|
||||
this.blockingStoreFileCount =
|
||||
conf.getInt("hbase.hstore.blockingStoreFiles", 7);
|
||||
|
||||
this.maxFilesToCompact = conf.getInt("hbase.hstore.compaction.max", 10);
|
||||
this.minCompactSize = conf.getLong("hbase.hstore.compaction.min.size",
|
||||
this.region.memstoreFlushSize);
|
||||
this.maxCompactSize
|
||||
= conf.getLong("hbase.hstore.compaction.max.size", Long.MAX_VALUE);
|
||||
|
||||
this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false);
|
||||
|
||||
if (HStore.closeCheckInterval == 0) {
|
||||
|
@ -234,14 +217,16 @@ public class HStore implements Store {
|
|||
// initilize bytes per checksum
|
||||
this.bytesPerChecksum = getBytesPerChecksum(conf);
|
||||
// Create a compaction tool instance
|
||||
this.compactor = new Compactor(this.conf);
|
||||
this.compactor = new Compactor(conf);
|
||||
// Create a compaction manager.
|
||||
this.compactionPolicy = new CompactionPolicy(conf, this);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param family
|
||||
* @return
|
||||
*/
|
||||
long getTTL(final HColumnDescriptor family) {
|
||||
private static long determineTTLFromFamily(final HColumnDescriptor family) {
|
||||
// HCD.getTimeToLive returns ttl in seconds. Convert to milliseconds.
|
||||
long ttl = family.getTimeToLive();
|
||||
if (ttl == HConstants.FOREVER) {
|
||||
|
@ -285,6 +270,22 @@ public class HStore implements Store {
|
|||
return this.fs;
|
||||
}
|
||||
|
||||
/* Implementation of StoreConfiguration */
|
||||
public long getStoreFileTtl() {
|
||||
// TTL only applies if there's no MIN_VERSIONs setting on the column.
|
||||
return (this.scanInfo.getMinVersions() == 0) ? this.ttl : Long.MAX_VALUE;
|
||||
}
|
||||
|
||||
public Long getMajorCompactionPeriod() {
|
||||
String strCompactionTime = this.family.getValue(HConstants.MAJOR_COMPACTION_PERIOD);
|
||||
return (strCompactionTime != null) ? new Long(strCompactionTime) : null;
|
||||
}
|
||||
|
||||
public long getMemstoreFlushSize() {
|
||||
return this.region.memstoreFlushSize;
|
||||
}
|
||||
/* End implementation of StoreConfiguration */
|
||||
|
||||
/**
|
||||
* Returns the configured bytesPerChecksum value.
|
||||
* @param conf The configuration
|
||||
|
@ -352,7 +353,8 @@ public class HStore implements Store {
|
|||
* @param family family name of this store
|
||||
* @return Path to the family/Store home directory
|
||||
*/
|
||||
public static Path getStoreHomedir(final Path parentRegionDirectory, final byte[] family) {
|
||||
public static Path getStoreHomedir(final Path parentRegionDirectory,
|
||||
final byte[] family) {
|
||||
return new Path(parentRegionDirectory, new Path(Bytes.toString(family)));
|
||||
}
|
||||
/**
|
||||
|
@ -566,7 +568,8 @@ public class HStore implements Store {
|
|||
"the destination store. Copying file over to destination filesystem.");
|
||||
Path tmpPath = getTmpPath();
|
||||
FileUtil.copy(srcFs, srcPath, fs, tmpPath, false, conf);
|
||||
LOG.info("Copied " + srcPath + " to temporary path on destination filesystem: " + tmpPath);
|
||||
LOG.info("Copied " + srcPath
|
||||
+ " to temporary path on destination filesystem: " + tmpPath);
|
||||
srcPath = tmpPath;
|
||||
}
|
||||
|
||||
|
@ -663,8 +666,8 @@ public class HStore implements Store {
|
|||
|
||||
/**
|
||||
* Snapshot this stores memstore. Call before running
|
||||
* {@link #flushCache(long, SortedSet, TimeRangeTracker, AtomicLong, MonitoredTask)} so it has
|
||||
* some work to do.
|
||||
* {@link #flushCache(long, SortedSet, TimeRangeTracker, AtomicLong, MonitoredTask)}
|
||||
* so it has some work to do.
|
||||
*/
|
||||
void snapshot() {
|
||||
this.memstore.snapshot();
|
||||
|
@ -722,7 +725,8 @@ public class HStore implements Store {
|
|||
InternalScanner scanner = null;
|
||||
KeyValueScanner memstoreScanner = new CollectionBackedScanner(set, this.comparator);
|
||||
if (getHRegion().getCoprocessorHost() != null) {
|
||||
scanner = getHRegion().getCoprocessorHost().preFlushScannerOpen(this, memstoreScanner);
|
||||
scanner = getHRegion().getCoprocessorHost()
|
||||
.preFlushScannerOpen(this, memstoreScanner);
|
||||
}
|
||||
if (scanner == null) {
|
||||
Scan scan = new Scan();
|
||||
|
@ -759,7 +763,8 @@ public class HStore implements Store {
|
|||
if (!kvs.isEmpty()) {
|
||||
for (KeyValue kv : kvs) {
|
||||
// If we know that this KV is going to be included always, then let us
|
||||
// set its memstoreTS to 0. This will help us save space when writing to disk.
|
||||
// set its memstoreTS to 0. This will help us save space when writing to
|
||||
// disk.
|
||||
if (kv.getMemstoreTS() <= smallestReadPoint) {
|
||||
// let us not change the original KV. It could be in the memstore
|
||||
// changing its memstoreTS could affect other threads/scanners.
|
||||
|
@ -774,7 +779,8 @@ public class HStore implements Store {
|
|||
} while (hasMore);
|
||||
} finally {
|
||||
// Write out the log sequence number that corresponds to this output
|
||||
// hfile. The hfile is current up to and including logCacheFlushId.
|
||||
// hfile. Also write current time in metadata as minFlushTime.
|
||||
// The hfile is current up to and including logCacheFlushId.
|
||||
status.setStatus("Flushing " + this + ": appending metadata");
|
||||
writer.appendMetadata(logCacheFlushId, false);
|
||||
status.setStatus("Flushing " + this + ": closing flushed file");
|
||||
|
@ -1004,12 +1010,12 @@ public class HStore implements Store {
|
|||
|
||||
// Ready to go. Have list of files to compact.
|
||||
LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
|
||||
+ this + " of "
|
||||
+ this.region.getRegionInfo().getRegionNameAsString()
|
||||
+ this + " of " + this.region.getRegionInfo().getRegionNameAsString()
|
||||
+ " into tmpdir=" + region.getTmpDir() + ", seqid=" + maxId + ", totalSize="
|
||||
+ StringUtils.humanReadableInt(cr.getSize()));
|
||||
|
||||
StoreFile sf = null;
|
||||
long compactionStartTime = EnvironmentEdgeManager.currentTimeMillis();
|
||||
try {
|
||||
StoreFile.Writer writer =
|
||||
this.compactor.compact(this, filesToCompact, cr.isMajor(), maxId);
|
||||
|
@ -1031,6 +1037,7 @@ public class HStore implements Store {
|
|||
}
|
||||
}
|
||||
|
||||
long now = EnvironmentEdgeManager.currentTimeMillis();
|
||||
LOG.info("Completed" + (cr.isMajor() ? " major " : " ") + "compaction of "
|
||||
+ filesToCompact.size() + " file(s) in " + this + " of "
|
||||
+ this.region.getRegionInfo().getRegionNameAsString()
|
||||
|
@ -1038,8 +1045,11 @@ public class HStore implements Store {
|
|||
(sf == null ? "none" : sf.getPath().getName()) +
|
||||
", size=" + (sf == null ? "none" :
|
||||
StringUtils.humanReadableInt(sf.getReader().length()))
|
||||
+ "; total size for store is "
|
||||
+ StringUtils.humanReadableInt(storeSize));
|
||||
+ "; total size for store is " + StringUtils.humanReadableInt(storeSize)
|
||||
+ ". This selection was in queue for "
|
||||
+ StringUtils.formatTimeDiff(compactionStartTime, cr.getSelectionTime())
|
||||
+ ", and took " + StringUtils.formatTimeDiff(now, compactionStartTime)
|
||||
+ " to execute.");
|
||||
return sf;
|
||||
}
|
||||
|
||||
|
@ -1094,38 +1104,7 @@ public class HStore implements Store {
|
|||
|
||||
@Override
|
||||
public boolean hasReferences() {
|
||||
return hasReferences(this.storefiles);
|
||||
}
|
||||
|
||||
/*
|
||||
* @param files
|
||||
* @return True if any of the files in <code>files</code> are References.
|
||||
*/
|
||||
private boolean hasReferences(Collection<StoreFile> files) {
|
||||
if (files != null && files.size() > 0) {
|
||||
for (StoreFile hsf: files) {
|
||||
if (hsf.isReference()) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/*
|
||||
* Gets lowest timestamp from candidate StoreFiles
|
||||
*
|
||||
* @param fs
|
||||
* @param dir
|
||||
* @throws IOException
|
||||
*/
|
||||
public static long getLowestTimestamp(final List<StoreFile> candidates)
|
||||
throws IOException {
|
||||
long minTs = Long.MAX_VALUE;
|
||||
for (StoreFile storeFile : candidates) {
|
||||
minTs = Math.min(minTs, storeFile.getModificationTimeStamp());
|
||||
}
|
||||
return minTs;
|
||||
return StoreUtils.hasReferences(this.storefiles);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1143,91 +1122,7 @@ public class HStore implements Store {
|
|||
}
|
||||
|
||||
List<StoreFile> candidates = new ArrayList<StoreFile>(this.storefiles);
|
||||
|
||||
// exclude files above the max compaction threshold
|
||||
// except: save all references. we MUST compact them
|
||||
int pos = 0;
|
||||
while (pos < candidates.size() &&
|
||||
candidates.get(pos).getReader().length() > this.maxCompactSize &&
|
||||
!candidates.get(pos).isReference()) ++pos;
|
||||
candidates.subList(0, pos).clear();
|
||||
|
||||
return isMajorCompaction(candidates);
|
||||
}
|
||||
|
||||
/*
|
||||
* @param filesToCompact Files to compact. Can be null.
|
||||
* @return True if we should run a major compaction.
|
||||
*/
|
||||
private boolean isMajorCompaction(final List<StoreFile> filesToCompact) throws IOException {
|
||||
boolean result = false;
|
||||
long mcTime = getNextMajorCompactTime();
|
||||
if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) {
|
||||
return result;
|
||||
}
|
||||
// TODO: Use better method for determining stamp of last major (HBASE-2990)
|
||||
long lowTimestamp = getLowestTimestamp(filesToCompact);
|
||||
long now = System.currentTimeMillis();
|
||||
if (lowTimestamp > 0l && lowTimestamp < (now - mcTime)) {
|
||||
// Major compaction time has elapsed.
|
||||
if (filesToCompact.size() == 1) {
|
||||
// Single file
|
||||
StoreFile sf = filesToCompact.get(0);
|
||||
long oldest =
|
||||
(sf.getReader().timeRangeTracker == null) ?
|
||||
Long.MIN_VALUE :
|
||||
now - sf.getReader().timeRangeTracker.minimumTimestamp;
|
||||
if (sf.isMajorCompaction() &&
|
||||
(this.ttl == HConstants.FOREVER || oldest < this.ttl)) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Skipping major compaction of " + this +
|
||||
" because one (major) compacted file only and oldestTime " +
|
||||
oldest + "ms is < ttl=" + this.ttl);
|
||||
}
|
||||
} else if (this.ttl != HConstants.FOREVER && oldest > this.ttl) {
|
||||
LOG.debug("Major compaction triggered on store " + this +
|
||||
", because keyvalues outdated; time since last major compaction " +
|
||||
(now - lowTimestamp) + "ms");
|
||||
result = true;
|
||||
}
|
||||
} else {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Major compaction triggered on store " + this +
|
||||
"; time since last major compaction " + (now - lowTimestamp) + "ms");
|
||||
}
|
||||
result = true;
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
long getNextMajorCompactTime() {
|
||||
// default = 24hrs
|
||||
long ret = conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24);
|
||||
if (family.getValue(HConstants.MAJOR_COMPACTION_PERIOD) != null) {
|
||||
String strCompactionTime =
|
||||
family.getValue(HConstants.MAJOR_COMPACTION_PERIOD);
|
||||
ret = (new Long(strCompactionTime)).longValue();
|
||||
}
|
||||
|
||||
if (ret > 0) {
|
||||
// default = 20% = +/- 4.8 hrs
|
||||
double jitterPct = conf.getFloat("hbase.hregion.majorcompaction.jitter",
|
||||
0.20F);
|
||||
if (jitterPct > 0) {
|
||||
long jitter = Math.round(ret * jitterPct);
|
||||
// deterministic jitter avoids a major compaction storm on restart
|
||||
ImmutableList<StoreFile> snapshot = storefiles;
|
||||
if (snapshot != null && !snapshot.isEmpty()) {
|
||||
String seed = snapshot.get(0).getPath().getName();
|
||||
double curRand = new Random(seed.hashCode()).nextDouble();
|
||||
ret += jitter - Math.round(2L * jitter * curRand);
|
||||
} else {
|
||||
ret = 0; // no storefiles == no major compaction
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
return compactionPolicy.isMajorCompaction(candidates);
|
||||
}
|
||||
|
||||
public CompactionRequest requestCompaction() throws IOException {
|
||||
|
@ -1263,9 +1158,11 @@ public class HStore implements Store {
|
|||
CompactSelection filesToCompact;
|
||||
if (override) {
|
||||
// coprocessor is overriding normal file selection
|
||||
filesToCompact = new CompactSelection(conf, candidates);
|
||||
filesToCompact = new CompactSelection(candidates);
|
||||
} else {
|
||||
filesToCompact = compactSelection(candidates, priority);
|
||||
boolean isUserCompaction = priority == Store.PRIORITY_USER;
|
||||
filesToCompact = compactionPolicy.selectCompaction(candidates, isUserCompaction,
|
||||
forceMajor && filesCompacting.isEmpty());
|
||||
}
|
||||
|
||||
if (region.getCoprocessorHost() != null) {
|
||||
|
@ -1288,12 +1185,17 @@ public class HStore implements Store {
|
|||
Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID);
|
||||
|
||||
// major compaction iff all StoreFiles are included
|
||||
boolean isMajor = (filesToCompact.getFilesToCompact().size() == this.storefiles.size());
|
||||
boolean isMajor =
|
||||
(filesToCompact.getFilesToCompact().size() == this.storefiles.size());
|
||||
if (isMajor) {
|
||||
// since we're enqueuing a major, update the compaction wait interval
|
||||
this.forceMajor = false;
|
||||
}
|
||||
|
||||
LOG.debug(getHRegion().regionInfo.getEncodedName() + " - " +
|
||||
getColumnFamilyName() + ": Initiating " +
|
||||
(isMajor ? "major" : "minor") + " compaction");
|
||||
|
||||
// everything went better than expected. create a compaction request
|
||||
int pri = getCompactPriority(priority);
|
||||
ret = new CompactionRequest(region, this, filesToCompact, isMajor, pri);
|
||||
|
@ -1315,191 +1217,6 @@ public class HStore implements Store {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Algorithm to choose which files to compact, see {@link #compactSelection(java.util.List, int)}
|
||||
* @param candidates
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
CompactSelection compactSelection(List<StoreFile> candidates) throws IOException {
|
||||
return compactSelection(candidates,Store.NO_PRIORITY);
|
||||
}
|
||||
|
||||
/**
|
||||
* Algorithm to choose which files to compact
|
||||
*
|
||||
* Configuration knobs:
|
||||
* "hbase.hstore.compaction.ratio"
|
||||
* normal case: minor compact when file <= sum(smaller_files) * ratio
|
||||
* "hbase.hstore.compaction.min.size"
|
||||
* unconditionally compact individual files below this size
|
||||
* "hbase.hstore.compaction.max.size"
|
||||
* never compact individual files above this size (unless splitting)
|
||||
* "hbase.hstore.compaction.min"
|
||||
* min files needed to minor compact
|
||||
* "hbase.hstore.compaction.max"
|
||||
* max files to compact at once (avoids OOM)
|
||||
*
|
||||
* @param candidates candidate files, ordered from oldest to newest
|
||||
* @return subset copy of candidate list that meets compaction criteria
|
||||
* @throws IOException
|
||||
*/
|
||||
CompactSelection compactSelection(List<StoreFile> candidates, int priority)
|
||||
throws IOException {
|
||||
// ASSUMPTION!!! filesCompacting is locked when calling this function
|
||||
|
||||
/* normal skew:
|
||||
*
|
||||
* older ----> newer
|
||||
* _
|
||||
* | | _
|
||||
* | | | | _
|
||||
* --|-|- |-|- |-|---_-------_------- minCompactSize
|
||||
* | | | | | | | | _ | |
|
||||
* | | | | | | | | | | | |
|
||||
* | | | | | | | | | | | |
|
||||
*/
|
||||
CompactSelection compactSelection = new CompactSelection(conf, candidates);
|
||||
|
||||
boolean forcemajor = this.forceMajor && filesCompacting.isEmpty();
|
||||
if (!forcemajor) {
|
||||
// Delete the expired store files before the compaction selection.
|
||||
if (conf.getBoolean("hbase.store.delete.expired.storefile", true)
|
||||
&& (ttl != Long.MAX_VALUE) && (this.scanInfo.minVersions == 0)) {
|
||||
CompactSelection expiredSelection = compactSelection
|
||||
.selectExpiredStoreFilesToCompact(
|
||||
EnvironmentEdgeManager.currentTimeMillis() - this.ttl);
|
||||
|
||||
// If there is any expired store files, delete them by compaction.
|
||||
if (expiredSelection != null) {
|
||||
return expiredSelection;
|
||||
}
|
||||
}
|
||||
// do not compact old files above a configurable threshold
|
||||
// save all references. we MUST compact them
|
||||
int pos = 0;
|
||||
while (pos < compactSelection.getFilesToCompact().size() &&
|
||||
compactSelection.getFilesToCompact().get(pos).getReader().length()
|
||||
> maxCompactSize &&
|
||||
!compactSelection.getFilesToCompact().get(pos).isReference()) ++pos;
|
||||
if (pos != 0) compactSelection.clearSubList(0, pos);
|
||||
}
|
||||
|
||||
if (compactSelection.getFilesToCompact().isEmpty()) {
|
||||
LOG.debug(this.getHRegionInfo().getEncodedName() + " - " +
|
||||
this + ": no store files to compact");
|
||||
compactSelection.emptyFileList();
|
||||
return compactSelection;
|
||||
}
|
||||
|
||||
// Force a major compaction if this is a user-requested major compaction,
|
||||
// or if we do not have too many files to compact and this was requested
|
||||
// as a major compaction
|
||||
boolean majorcompaction = (forcemajor && priority == Store.PRIORITY_USER) ||
|
||||
(forcemajor || isMajorCompaction(compactSelection.getFilesToCompact())) &&
|
||||
(compactSelection.getFilesToCompact().size() < this.maxFilesToCompact
|
||||
);
|
||||
LOG.debug(this.getHRegionInfo().getEncodedName() + " - "
|
||||
+ this.getColumnFamilyName() + ": Initiating " +
|
||||
(majorcompaction ? "major" : "minor") + "compaction");
|
||||
|
||||
if (!majorcompaction &&
|
||||
!hasReferences(compactSelection.getFilesToCompact())) {
|
||||
// we're doing a minor compaction, let's see what files are applicable
|
||||
int start = 0;
|
||||
double r = compactSelection.getCompactSelectionRatio();
|
||||
|
||||
// remove bulk import files that request to be excluded from minors
|
||||
compactSelection.getFilesToCompact().removeAll(Collections2.filter(
|
||||
compactSelection.getFilesToCompact(),
|
||||
new Predicate<StoreFile>() {
|
||||
public boolean apply(StoreFile input) {
|
||||
return input.excludeFromMinorCompaction();
|
||||
}
|
||||
}));
|
||||
|
||||
// skip selection algorithm if we don't have enough files
|
||||
if (compactSelection.getFilesToCompact().size() < this.minFilesToCompact) {
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("Not compacting files because we only have " +
|
||||
compactSelection.getFilesToCompact().size() +
|
||||
" files ready for compaction. Need " + this.minFilesToCompact + " to initiate.");
|
||||
}
|
||||
compactSelection.emptyFileList();
|
||||
return compactSelection;
|
||||
}
|
||||
|
||||
/* TODO: add sorting + unit test back in when HBASE-2856 is fixed
|
||||
// Sort files by size to correct when normal skew is altered by bulk load.
|
||||
Collections.sort(filesToCompact, StoreFile.Comparators.FILE_SIZE);
|
||||
*/
|
||||
|
||||
// get store file sizes for incremental compacting selection.
|
||||
int countOfFiles = compactSelection.getFilesToCompact().size();
|
||||
long [] fileSizes = new long[countOfFiles];
|
||||
long [] sumSize = new long[countOfFiles];
|
||||
for (int i = countOfFiles-1; i >= 0; --i) {
|
||||
StoreFile file = compactSelection.getFilesToCompact().get(i);
|
||||
fileSizes[i] = file.getReader().length();
|
||||
// calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
|
||||
int tooFar = i + this.maxFilesToCompact - 1;
|
||||
sumSize[i] = fileSizes[i]
|
||||
+ ((i+1 < countOfFiles) ? sumSize[i+1] : 0)
|
||||
- ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
|
||||
}
|
||||
|
||||
/* Start at the oldest file and stop when you find the first file that
|
||||
* meets compaction criteria:
|
||||
* (1) a recently-flushed, small file (i.e. <= minCompactSize)
|
||||
* OR
|
||||
* (2) within the compactRatio of sum(newer_files)
|
||||
* Given normal skew, any newer files will also meet this criteria
|
||||
*
|
||||
* Additional Note:
|
||||
* If fileSizes.size() >> maxFilesToCompact, we will recurse on
|
||||
* compact(). Consider the oldest files first to avoid a
|
||||
* situation where we always compact [end-threshold,end). Then, the
|
||||
* last file becomes an aggregate of the previous compactions.
|
||||
*/
|
||||
while(countOfFiles - start >= this.minFilesToCompact &&
|
||||
fileSizes[start] >
|
||||
Math.max(minCompactSize, (long)(sumSize[start+1] * r))) {
|
||||
++start;
|
||||
}
|
||||
int end = Math.min(countOfFiles, start + this.maxFilesToCompact);
|
||||
long totalSize = fileSizes[start]
|
||||
+ ((start+1 < countOfFiles) ? sumSize[start+1] : 0);
|
||||
compactSelection = compactSelection.getSubList(start, end);
|
||||
|
||||
// if we don't have enough files to compact, just wait
|
||||
if (compactSelection.getFilesToCompact().size() < this.minFilesToCompact) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Skipped compaction of " + this
|
||||
+ ". Only " + (end - start) + " file(s) of size "
|
||||
+ StringUtils.humanReadableInt(totalSize)
|
||||
+ " have met compaction criteria.");
|
||||
}
|
||||
compactSelection.emptyFileList();
|
||||
return compactSelection;
|
||||
}
|
||||
} else {
|
||||
if(majorcompaction) {
|
||||
if (compactSelection.getFilesToCompact().size() > this.maxFilesToCompact) {
|
||||
LOG.debug("Warning, compacting more than " + this.maxFilesToCompact +
|
||||
" files, probably because of a user-requested major compaction");
|
||||
if(priority != Store.PRIORITY_USER) {
|
||||
LOG.error("Compacting more than max files on a non user-requested compaction");
|
||||
}
|
||||
}
|
||||
} else if (compactSelection.getFilesToCompact().size() > this.maxFilesToCompact) {
|
||||
// all files included in this compaction, up to max
|
||||
int pastMax = compactSelection.getFilesToCompact().size() - this.maxFilesToCompact;
|
||||
compactSelection.getFilesToCompact().subList(0, pastMax).clear();
|
||||
}
|
||||
}
|
||||
return compactSelection;
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates a store file by opening and closing it. In HFileV2 this should
|
||||
* not be an expensive operation.
|
||||
|
@ -1597,8 +1314,8 @@ public class HStore implements Store {
|
|||
|
||||
// let the archive util decide if we should archive or delete the files
|
||||
LOG.debug("Removing store files after compaction...");
|
||||
HFileArchiver.archiveStoreFiles(this.fs, this.region, this.conf, this.family.getName(),
|
||||
compactedFiles);
|
||||
HFileArchiver.archiveStoreFiles(this.fs, this.region, this.conf,
|
||||
this.family.getName(), compactedFiles);
|
||||
|
||||
} catch (IOException e) {
|
||||
e = RemoteExceptionHandler.checkIOException(e);
|
||||
|
@ -2005,11 +1722,7 @@ public class HStore implements Store {
|
|||
|
||||
@Override
|
||||
public boolean throttleCompaction(long compactionSize) {
|
||||
// see HBASE-5867 for discussion on the default
|
||||
long throttlePoint = conf.getLong(
|
||||
"hbase.regionserver.thread.compaction.throttle",
|
||||
2 * this.minFilesToCompact * this.region.memstoreFlushSize);
|
||||
return compactionSize > throttlePoint;
|
||||
return compactionPolicy.throttleCompaction(compactionSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -2115,7 +1828,7 @@ public class HStore implements Store {
|
|||
|
||||
@Override
|
||||
public boolean needsCompaction() {
|
||||
return (storefiles.size() - filesCompacting.size()) > minFilesToCompact;
|
||||
return compactionPolicy.needsCompaction(storefiles.size() - filesCompacting.size());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -2124,8 +1837,8 @@ public class HStore implements Store {
|
|||
}
|
||||
|
||||
public static final long FIXED_OVERHEAD =
|
||||
ClassSize.align((19 * ClassSize.REFERENCE) + (6 * Bytes.SIZEOF_LONG)
|
||||
+ (5 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
|
||||
ClassSize.align((20 * ClassSize.REFERENCE) + (4 * Bytes.SIZEOF_LONG)
|
||||
+ (3 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
|
||||
|
||||
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
|
||||
+ ClassSize.OBJECT + ClassSize.REENTRANT_LOCK
|
||||
|
|
|
@ -0,0 +1,50 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
||||
/**
|
||||
* The class that contains shared information about various knobs of a Store/HStore object.
|
||||
* Unlike the configuration objects that merely return the XML values, the implementations
|
||||
* should return ready-to-use applicable values for corresponding calls, after all the
|
||||
* parsing/validation/adjustment for other considerations, so that we don't have to repeat
|
||||
* this logic in multiple places.
|
||||
* TODO: move methods and logic here as necessary.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
public interface StoreConfiguration {
|
||||
/**
|
||||
* Gets the cf-specific major compaction period.
|
||||
*/
|
||||
public Long getMajorCompactionPeriod();
|
||||
|
||||
|
||||
/**
|
||||
* Gets the Memstore flush size for the region that this store works with.
|
||||
*/
|
||||
public long getMemstoreFlushSize();
|
||||
|
||||
/**
|
||||
* Gets the cf-specific time-to-live for store files.
|
||||
*/
|
||||
public long getStoreFileTtl();
|
||||
}
|
|
@ -65,6 +65,7 @@ import org.apache.hadoop.hbase.util.BloomFilter;
|
|||
import org.apache.hadoop.hbase.util.BloomFilterFactory;
|
||||
import org.apache.hadoop.hbase.util.BloomFilterWriter;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.Writables;
|
||||
import org.apache.hadoop.io.RawComparator;
|
||||
|
@ -295,7 +296,7 @@ public class StoreFile {
|
|||
* @return True if this is a StoreFile Reference; call after {@link #open()}
|
||||
* else may get wrong answer.
|
||||
*/
|
||||
boolean isReference() {
|
||||
public boolean isReference() {
|
||||
return this.reference != null;
|
||||
}
|
||||
|
||||
|
@ -357,7 +358,7 @@ public class StoreFile {
|
|||
/**
|
||||
* @return True if this file was made by a major compaction.
|
||||
*/
|
||||
boolean isMajorCompaction() {
|
||||
public boolean isMajorCompaction() {
|
||||
if (this.majorCompaction == null) {
|
||||
throw new NullPointerException("This has not been set yet");
|
||||
}
|
||||
|
@ -367,7 +368,7 @@ public class StoreFile {
|
|||
/**
|
||||
* @return True if this file should not be part of a minor compaction.
|
||||
*/
|
||||
boolean excludeFromMinorCompaction() {
|
||||
public boolean excludeFromMinorCompaction() {
|
||||
return this.excludeFromMinorCompaction;
|
||||
}
|
||||
|
||||
|
@ -579,7 +580,6 @@ public class StoreFile {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
this.reader.setSequenceID(this.sequenceid);
|
||||
|
||||
b = metadataMap.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY);
|
||||
|
@ -945,6 +945,11 @@ public class StoreFile {
|
|||
return r.write(fs, p);
|
||||
}
|
||||
|
||||
public Long getMinimumTimestamp() {
|
||||
return (getReader().timeRangeTracker == null) ?
|
||||
null :
|
||||
getReader().timeRangeTracker.minimumTimestamp;
|
||||
}
|
||||
|
||||
/**
|
||||
* A StoreFile writer. Use this to read/write HBase Store Files. It is package
|
||||
|
|
|
@ -0,0 +1,64 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Utility functions for region server storage layer.
|
||||
*/
|
||||
public class StoreUtils {
|
||||
/**
|
||||
* Creates a deterministic hash code for store file collection.
|
||||
*/
|
||||
public static Integer getDeterministicRandomSeed(final List<StoreFile> files) {
|
||||
if (files != null && !files.isEmpty()) {
|
||||
return files.get(0).getPath().getName().hashCode();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Determines whether any files in the collection are references.
|
||||
*/
|
||||
public static boolean hasReferences(final Collection<StoreFile> files) {
|
||||
if (files != null && files.size() > 0) {
|
||||
for (StoreFile hsf: files) {
|
||||
if (hsf.isReference()) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets lowest timestamp from candidate StoreFiles
|
||||
*/
|
||||
public static long getLowestTimestamp(final List<StoreFile> candidates)
|
||||
throws IOException {
|
||||
long minTs = Long.MAX_VALUE;
|
||||
for (StoreFile storeFile : candidates) {
|
||||
minTs = Math.min(minTs, storeFile.getModificationTimeStamp());
|
||||
}
|
||||
return minTs;
|
||||
}
|
||||
}
|
|
@ -19,15 +19,13 @@
|
|||
package org.apache.hadoop.hbase.regionserver.compactions;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Calendar;
|
||||
import java.util.GregorianCalendar;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public class CompactSelection {
|
||||
|
@ -48,37 +46,15 @@ public class CompactSelection {
|
|||
*/
|
||||
private final static Object compactionCountLock = new Object();
|
||||
|
||||
// HBase conf object
|
||||
Configuration conf;
|
||||
// was this compaction promoted to an off-peak
|
||||
boolean isOffPeakCompaction = false;
|
||||
// compactRatio: double on purpose! Float.MAX < Long.MAX < Double.MAX
|
||||
// With float, java will downcast your long to float for comparisons (bad)
|
||||
private double compactRatio;
|
||||
// compaction ratio off-peak
|
||||
private double compactRatioOffPeak;
|
||||
// offpeak start time
|
||||
private int offPeakStartHour = -1;
|
||||
// off peak end time
|
||||
private int offPeakEndHour = -1;
|
||||
// CompactSelection object creation time.
|
||||
private final long selectionTime;
|
||||
|
||||
public CompactSelection(Configuration conf, List<StoreFile> filesToCompact) {
|
||||
public CompactSelection(List<StoreFile> filesToCompact) {
|
||||
this.selectionTime = EnvironmentEdgeManager.currentTimeMillis();
|
||||
this.filesToCompact = filesToCompact;
|
||||
this.conf = conf;
|
||||
this.compactRatio = conf.getFloat("hbase.hstore.compaction.ratio", 1.2F);
|
||||
this.compactRatioOffPeak = conf.getFloat("hbase.hstore.compaction.ratio.offpeak", 5.0F);
|
||||
|
||||
// Peak time is from [offPeakStartHour, offPeakEndHour). Valid numbers are [0, 23]
|
||||
this.offPeakStartHour = conf.getInt("hbase.offpeak.start.hour", -1);
|
||||
this.offPeakEndHour = conf.getInt("hbase.offpeak.end.hour", -1);
|
||||
if (!isValidHour(this.offPeakStartHour) || !isValidHour(this.offPeakEndHour)) {
|
||||
if (!(this.offPeakStartHour == -1 && this.offPeakEndHour == -1)) {
|
||||
LOG.warn("Invalid start/end hour for peak hour : start = " +
|
||||
this.offPeakStartHour + " end = " + this.offPeakEndHour +
|
||||
". Valid numbers are [0-23]");
|
||||
}
|
||||
this.offPeakStartHour = this.offPeakEndHour = -1;
|
||||
}
|
||||
this.isOffPeakCompaction = false;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -113,49 +89,25 @@ public class CompactSelection {
|
|||
}
|
||||
|
||||
if (hasExpiredStoreFiles) {
|
||||
expiredSFSelection = new CompactSelection(conf, expiredStoreFiles);
|
||||
expiredSFSelection = new CompactSelection(expiredStoreFiles);
|
||||
}
|
||||
return expiredSFSelection;
|
||||
}
|
||||
|
||||
/**
|
||||
* If the current hour falls in the off peak times and there are no
|
||||
* outstanding off peak compactions, the current compaction is
|
||||
* promoted to an off peak compaction. Currently only one off peak
|
||||
* compaction is present in the compaction queue.
|
||||
*
|
||||
* @param currentHour
|
||||
* @return
|
||||
*/
|
||||
public double getCompactSelectionRatio() {
|
||||
double r = this.compactRatio;
|
||||
synchronized(compactionCountLock) {
|
||||
if (isOffPeakHour() && numOutstandingOffPeakCompactions == 0) {
|
||||
r = this.compactRatioOffPeak;
|
||||
numOutstandingOffPeakCompactions++;
|
||||
isOffPeakCompaction = true;
|
||||
}
|
||||
}
|
||||
if(isOffPeakCompaction) {
|
||||
LOG.info("Running an off-peak compaction, selection ratio = " +
|
||||
compactRatioOffPeak + ", numOutstandingOffPeakCompactions is now " +
|
||||
numOutstandingOffPeakCompactions);
|
||||
}
|
||||
return r;
|
||||
}
|
||||
|
||||
/**
|
||||
* The current compaction finished, so reset the off peak compactions count
|
||||
* if this was an off peak compaction.
|
||||
*/
|
||||
public void finishRequest() {
|
||||
if (isOffPeakCompaction) {
|
||||
long newValueToLog = -1;
|
||||
synchronized(compactionCountLock) {
|
||||
numOutstandingOffPeakCompactions--;
|
||||
assert !isOffPeakCompaction : "Double-counting off-peak count for compaction";
|
||||
newValueToLog = --numOutstandingOffPeakCompactions;
|
||||
isOffPeakCompaction = false;
|
||||
}
|
||||
LOG.info("Compaction done, numOutstandingOffPeakCompactions is now " +
|
||||
numOutstandingOffPeakCompactions);
|
||||
newValueToLog);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -170,13 +122,14 @@ public class CompactSelection {
|
|||
public void emptyFileList() {
|
||||
filesToCompact.clear();
|
||||
if (isOffPeakCompaction) {
|
||||
long newValueToLog = -1;
|
||||
synchronized(compactionCountLock) {
|
||||
// reset the off peak count
|
||||
numOutstandingOffPeakCompactions--;
|
||||
newValueToLog = --numOutstandingOffPeakCompactions;
|
||||
isOffPeakCompaction = false;
|
||||
}
|
||||
LOG.info("Nothing to compact, numOutstandingOffPeakCompactions is now " +
|
||||
numOutstandingOffPeakCompactions);
|
||||
newValueToLog);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -184,16 +137,30 @@ public class CompactSelection {
|
|||
return this.isOffPeakCompaction;
|
||||
}
|
||||
|
||||
private boolean isOffPeakHour() {
|
||||
int currentHour = (new GregorianCalendar()).get(Calendar.HOUR_OF_DAY);
|
||||
// If offpeak time checking is disabled just return false.
|
||||
if (this.offPeakStartHour == this.offPeakEndHour) {
|
||||
return false;
|
||||
public static long getNumOutStandingOffPeakCompactions() {
|
||||
synchronized(compactionCountLock) {
|
||||
return numOutstandingOffPeakCompactions;
|
||||
}
|
||||
if (this.offPeakStartHour < this.offPeakEndHour) {
|
||||
return (currentHour >= this.offPeakStartHour && currentHour < this.offPeakEndHour);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tries making the compaction off-peak.
|
||||
* Only checks internal compaction constraints, not timing.
|
||||
* @return Eventual value of isOffPeakCompaction.
|
||||
*/
|
||||
public boolean trySetOffpeak() {
|
||||
assert !isOffPeakCompaction : "Double-setting off-peak for compaction " + this;
|
||||
synchronized(compactionCountLock) {
|
||||
if (numOutstandingOffPeakCompactions == 0) {
|
||||
numOutstandingOffPeakCompactions++;
|
||||
isOffPeakCompaction = true;
|
||||
}
|
||||
}
|
||||
return (currentHour >= this.offPeakStartHour || currentHour < this.offPeakEndHour);
|
||||
return isOffPeakCompaction;
|
||||
}
|
||||
|
||||
public long getSelectionTime() {
|
||||
return selectionTime;
|
||||
}
|
||||
|
||||
public CompactSelection subList(int start, int end) {
|
||||
|
|
|
@ -0,0 +1,214 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.regionserver.compactions;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreConfiguration;
|
||||
|
||||
/**
|
||||
* Compaction configuration for a particular instance of HStore.
|
||||
* Takes into account both global settings and ones set on the column family/store.
|
||||
* Control knobs for default compaction algorithm:
|
||||
* <p/>
|
||||
* maxCompactSize - upper bound on file size to be included in minor compactions
|
||||
* minCompactSize - lower bound below which compaction is selected without ratio test
|
||||
* minFilesToCompact - lower bound on number of files in any minor compaction
|
||||
* maxFilesToCompact - upper bound on number of files in any minor compaction
|
||||
* compactionRatio - Ratio used for compaction
|
||||
* <p/>
|
||||
* Set parameter as "hbase.hstore.compaction.<attribute>"
|
||||
*/
|
||||
|
||||
//TODO: revisit this class for online parameter updating (both in xml and on the CF)
|
||||
@InterfaceAudience.Private
|
||||
public class CompactionConfiguration {
|
||||
|
||||
static final Log LOG = LogFactory.getLog(CompactionConfiguration.class);
|
||||
|
||||
private static final String CONFIG_PREFIX = "hbase.hstore.compaction.";
|
||||
|
||||
Configuration conf;
|
||||
StoreConfiguration storeConfig;
|
||||
|
||||
long maxCompactSize;
|
||||
long minCompactSize;
|
||||
int minFilesToCompact;
|
||||
int maxFilesToCompact;
|
||||
double compactionRatio;
|
||||
double offPeekCompactionRatio;
|
||||
int offPeakStartHour;
|
||||
int offPeakEndHour;
|
||||
long throttlePoint;
|
||||
boolean shouldDeleteExpired;
|
||||
long majorCompactionPeriod;
|
||||
float majorCompactionJitter;
|
||||
|
||||
CompactionConfiguration(Configuration conf, StoreConfiguration storeConfig) {
|
||||
this.conf = conf;
|
||||
this.storeConfig = storeConfig;
|
||||
|
||||
maxCompactSize = conf.getLong(CONFIG_PREFIX + "max.size", Long.MAX_VALUE);
|
||||
minCompactSize = conf.getLong(CONFIG_PREFIX + "min.size",
|
||||
storeConfig.getMemstoreFlushSize());
|
||||
minFilesToCompact = Math.max(2, conf.getInt(CONFIG_PREFIX + "min",
|
||||
/*old name*/ conf.getInt("hbase.hstore.compactionThreshold", 3)));
|
||||
maxFilesToCompact = conf.getInt(CONFIG_PREFIX + "max", 10);
|
||||
compactionRatio = conf.getFloat(CONFIG_PREFIX + "ratio", 1.2F);
|
||||
offPeekCompactionRatio = conf.getFloat(CONFIG_PREFIX + "ratio.offpeak", 5.0F);
|
||||
offPeakStartHour = conf.getInt("hbase.offpeak.start.hour", -1);
|
||||
offPeakEndHour = conf.getInt("hbase.offpeak.end.hour", -1);
|
||||
|
||||
if (!isValidHour(offPeakStartHour) || !isValidHour(offPeakEndHour)) {
|
||||
if (!(offPeakStartHour == -1 && offPeakEndHour == -1)) {
|
||||
LOG.warn("Ignoring invalid start/end hour for peak hour : start = " +
|
||||
this.offPeakStartHour + " end = " + this.offPeakEndHour +
|
||||
". Valid numbers are [0-23]");
|
||||
}
|
||||
this.offPeakStartHour = this.offPeakEndHour = -1;
|
||||
}
|
||||
|
||||
throttlePoint = conf.getLong("hbase.regionserver.thread.compaction.throttle",
|
||||
2 * maxFilesToCompact * storeConfig.getMemstoreFlushSize());
|
||||
shouldDeleteExpired = conf.getBoolean("hbase.store.delete.expired.storefile", true);
|
||||
majorCompactionPeriod = conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24);
|
||||
majorCompactionJitter = conf.getFloat("hbase.hregion.majorcompaction.jitter", 0.20F);
|
||||
|
||||
LOG.info("Compaction configuration " + this.toString());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.format(
|
||||
"size [%d, %d); files [%d, %d); ratio %f; off-peak ratio %f; off-peak hours %d-%d; "
|
||||
+ "throttle point %d;%s delete expired; major period %d, major jitter %f",
|
||||
minCompactSize,
|
||||
maxCompactSize,
|
||||
minFilesToCompact,
|
||||
maxFilesToCompact,
|
||||
compactionRatio,
|
||||
offPeekCompactionRatio,
|
||||
offPeakStartHour,
|
||||
offPeakEndHour,
|
||||
throttlePoint,
|
||||
shouldDeleteExpired ? "" : " don't",
|
||||
majorCompactionPeriod,
|
||||
majorCompactionJitter);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return lower bound below which compaction is selected without ratio test
|
||||
*/
|
||||
long getMinCompactSize() {
|
||||
return minCompactSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return upper bound on file size to be included in minor compactions
|
||||
*/
|
||||
long getMaxCompactSize() {
|
||||
return maxCompactSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return upper bound on number of files to be included in minor compactions
|
||||
*/
|
||||
int getMinFilesToCompact() {
|
||||
return minFilesToCompact;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return upper bound on number of files to be included in minor compactions
|
||||
*/
|
||||
int getMaxFilesToCompact() {
|
||||
return maxFilesToCompact;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Ratio used for compaction
|
||||
*/
|
||||
double getCompactionRatio() {
|
||||
return compactionRatio;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Off peak Ratio used for compaction
|
||||
*/
|
||||
double getCompactionRatioOffPeak() {
|
||||
return offPeekCompactionRatio;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Hour at which off-peak compactions start
|
||||
*/
|
||||
int getOffPeakStartHour() {
|
||||
return offPeakStartHour;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Hour at which off-peak compactions end
|
||||
*/
|
||||
int getOffPeakEndHour() {
|
||||
return offPeakEndHour;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return ThrottlePoint used for classifying small and large compactions
|
||||
*/
|
||||
long getThrottlePoint() {
|
||||
return throttlePoint;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Major compaction period from compaction.
|
||||
* Major compactions are selected periodically according to this parameter plus jitter
|
||||
*/
|
||||
long getMajorCompactionPeriod() {
|
||||
if (storeConfig != null) {
|
||||
Long storeSpecificPeriod = storeConfig.getMajorCompactionPeriod();
|
||||
if (storeSpecificPeriod != null) {
|
||||
return storeSpecificPeriod;
|
||||
}
|
||||
}
|
||||
return majorCompactionPeriod;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Major the jitter fraction, the fraction within which the major compaction
|
||||
* period is randomly chosen from the majorCompactionPeriod in each store.
|
||||
*/
|
||||
float getMajorCompactionJitter() {
|
||||
return majorCompactionJitter;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Whether expired files should be deleted ASAP using compactions
|
||||
*/
|
||||
boolean shouldDeleteExpired() {
|
||||
return shouldDeleteExpired;
|
||||
}
|
||||
|
||||
private static boolean isValidHour(int hour) {
|
||||
return (hour >= 0 && hour <= 23);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,409 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.regionserver.compactions;
|
||||
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.collect.Collections2;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreConfiguration;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreUtils;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Calendar;
|
||||
import java.util.GregorianCalendar;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
|
||||
/**
|
||||
* The default (and only, as of now) algorithm for selecting files for compaction.
|
||||
* Combines the compaction configuration and the provisional file selection that
|
||||
* it's given to produce the list of suitable candidates for compaction.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class CompactionPolicy {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(CompactionPolicy.class);
|
||||
private final static Calendar calendar = new GregorianCalendar();
|
||||
|
||||
CompactionConfiguration comConf;
|
||||
StoreConfiguration storeConfig;
|
||||
|
||||
public CompactionPolicy(Configuration configuration, StoreConfiguration storeConfig) {
|
||||
updateConfiguration(configuration, storeConfig);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param candidateFiles candidate files, ordered from oldest to newest
|
||||
* @return subset copy of candidate list that meets compaction criteria
|
||||
* @throws java.io.IOException
|
||||
*/
|
||||
public CompactSelection selectCompaction(List<StoreFile> candidateFiles,
|
||||
boolean isUserCompaction, boolean forceMajor)
|
||||
throws IOException {
|
||||
// Prelimanry compaction subject to filters
|
||||
CompactSelection candidateSelection = new CompactSelection(candidateFiles);
|
||||
long cfTtl = this.storeConfig.getStoreFileTtl();
|
||||
if (!forceMajor) {
|
||||
// If there are expired files, only select them so that compaction deletes them
|
||||
if (comConf.shouldDeleteExpired() && (cfTtl != Long.MAX_VALUE)) {
|
||||
CompactSelection expiredSelection = selectExpiredStoreFiles(
|
||||
candidateSelection, EnvironmentEdgeManager.currentTimeMillis() - cfTtl);
|
||||
if (expiredSelection != null) {
|
||||
return expiredSelection;
|
||||
}
|
||||
}
|
||||
candidateSelection = skipLargeFiles(candidateSelection);
|
||||
}
|
||||
|
||||
// Force a major compaction if this is a user-requested major compaction,
|
||||
// or if we do not have too many files to compact and this was requested
|
||||
// as a major compaction.
|
||||
// Or, if there are any references among the candidates.
|
||||
boolean majorCompaction = (
|
||||
(forceMajor && isUserCompaction)
|
||||
|| ((forceMajor || isMajorCompaction(candidateSelection.getFilesToCompact()))
|
||||
&& (candidateSelection.getFilesToCompact().size() < comConf.getMaxFilesToCompact()))
|
||||
|| StoreUtils.hasReferences(candidateSelection.getFilesToCompact())
|
||||
);
|
||||
|
||||
if (!majorCompaction) {
|
||||
// we're doing a minor compaction, let's see what files are applicable
|
||||
candidateSelection = filterBulk(candidateSelection);
|
||||
candidateSelection = applyCompactionPolicy(candidateSelection);
|
||||
candidateSelection = checkMinFilesCriteria(candidateSelection);
|
||||
}
|
||||
candidateSelection =
|
||||
removeExcessFiles(candidateSelection, isUserCompaction, majorCompaction);
|
||||
return candidateSelection;
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates the compaction configuration. Used for tests.
|
||||
* TODO: replace when HBASE-3909 is completed in some form.
|
||||
*/
|
||||
public void updateConfiguration(Configuration configuration,
|
||||
StoreConfiguration storeConfig) {
|
||||
this.comConf = new CompactionConfiguration(configuration, storeConfig);
|
||||
this.storeConfig = storeConfig;
|
||||
}
|
||||
|
||||
/**
|
||||
* Select the expired store files to compact
|
||||
*
|
||||
* @param candidates the initial set of storeFiles
|
||||
* @param maxExpiredTimeStamp
|
||||
* The store file will be marked as expired if its max time stamp is
|
||||
* less than this maxExpiredTimeStamp.
|
||||
* @return A CompactSelection contains the expired store files as
|
||||
* filesToCompact
|
||||
*/
|
||||
private CompactSelection selectExpiredStoreFiles(
|
||||
CompactSelection candidates, long maxExpiredTimeStamp) {
|
||||
List<StoreFile> filesToCompact = candidates.getFilesToCompact();
|
||||
if (filesToCompact == null || filesToCompact.size() == 0)
|
||||
return null;
|
||||
ArrayList<StoreFile> expiredStoreFiles = null;
|
||||
boolean hasExpiredStoreFiles = false;
|
||||
CompactSelection expiredSFSelection = null;
|
||||
|
||||
for (StoreFile storeFile : filesToCompact) {
|
||||
if (storeFile.getReader().getMaxTimestamp() < maxExpiredTimeStamp) {
|
||||
LOG.info("Deleting the expired store file by compaction: "
|
||||
+ storeFile.getPath() + " whose maxTimeStamp is "
|
||||
+ storeFile.getReader().getMaxTimestamp()
|
||||
+ " while the max expired timestamp is " + maxExpiredTimeStamp);
|
||||
if (!hasExpiredStoreFiles) {
|
||||
expiredStoreFiles = new ArrayList<StoreFile>();
|
||||
hasExpiredStoreFiles = true;
|
||||
}
|
||||
expiredStoreFiles.add(storeFile);
|
||||
}
|
||||
}
|
||||
|
||||
if (hasExpiredStoreFiles) {
|
||||
expiredSFSelection = new CompactSelection(expiredStoreFiles);
|
||||
}
|
||||
return expiredSFSelection;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param candidates pre-filtrate
|
||||
* @return filtered subset
|
||||
* exclude all files above maxCompactSize
|
||||
* Also save all references. We MUST compact them
|
||||
*/
|
||||
private CompactSelection skipLargeFiles(CompactSelection candidates) {
|
||||
int pos = 0;
|
||||
while (pos < candidates.getFilesToCompact().size() &&
|
||||
candidates.getFilesToCompact().get(pos).getReader().length() >
|
||||
comConf.getMaxCompactSize() &&
|
||||
!candidates.getFilesToCompact().get(pos).isReference()) {
|
||||
++pos;
|
||||
}
|
||||
if (pos > 0) {
|
||||
LOG.debug("Some files are too large. Excluding " + pos
|
||||
+ " files from compaction candidates");
|
||||
candidates.clearSubList(0, pos);
|
||||
}
|
||||
return candidates;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param candidates pre-filtrate
|
||||
* @return filtered subset
|
||||
* exclude all bulk load files if configured
|
||||
*/
|
||||
private CompactSelection filterBulk(CompactSelection candidates) {
|
||||
candidates.getFilesToCompact().removeAll(Collections2.filter(
|
||||
candidates.getFilesToCompact(),
|
||||
new Predicate<StoreFile>() {
|
||||
@Override
|
||||
public boolean apply(StoreFile input) {
|
||||
return input.excludeFromMinorCompaction();
|
||||
}
|
||||
}));
|
||||
return candidates;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param candidates pre-filtrate
|
||||
* @return filtered subset
|
||||
* take upto maxFilesToCompact from the start
|
||||
*/
|
||||
private CompactSelection removeExcessFiles(CompactSelection candidates,
|
||||
boolean isUserCompaction, boolean isMajorCompaction) {
|
||||
int excess = candidates.getFilesToCompact().size() - comConf.getMaxFilesToCompact();
|
||||
if (excess > 0) {
|
||||
if (isMajorCompaction && isUserCompaction) {
|
||||
LOG.debug("Warning, compacting more than " + comConf.getMaxFilesToCompact() +
|
||||
" files because of a user-requested major compaction");
|
||||
} else {
|
||||
LOG.debug("Too many admissible files. Excluding " + excess
|
||||
+ " files from compaction candidates");
|
||||
candidates.clearSubList(comConf.getMaxFilesToCompact(),
|
||||
candidates.getFilesToCompact().size());
|
||||
}
|
||||
}
|
||||
return candidates;
|
||||
}
|
||||
/**
|
||||
* @param candidates pre-filtrate
|
||||
* @return filtered subset
|
||||
* forget the compactionSelection if we don't have enough files
|
||||
*/
|
||||
private CompactSelection checkMinFilesCriteria(CompactSelection candidates) {
|
||||
int minFiles = comConf.getMinFilesToCompact();
|
||||
if (candidates.getFilesToCompact().size() < minFiles) {
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("Not compacting files because we only have " +
|
||||
candidates.getFilesToCompact().size() +
|
||||
" files ready for compaction. Need " + minFiles + " to initiate.");
|
||||
}
|
||||
candidates.emptyFileList();
|
||||
}
|
||||
return candidates;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param candidates pre-filtrate
|
||||
* @return filtered subset
|
||||
* -- Default minor compaction selection algorithm:
|
||||
* choose CompactSelection from candidates --
|
||||
* First exclude bulk-load files if indicated in configuration.
|
||||
* Start at the oldest file and stop when you find the first file that
|
||||
* meets compaction criteria:
|
||||
* (1) a recently-flushed, small file (i.e. <= minCompactSize)
|
||||
* OR
|
||||
* (2) within the compactRatio of sum(newer_files)
|
||||
* Given normal skew, any newer files will also meet this criteria
|
||||
* <p/>
|
||||
* Additional Note:
|
||||
* If fileSizes.size() >> maxFilesToCompact, we will recurse on
|
||||
* compact(). Consider the oldest files first to avoid a
|
||||
* situation where we always compact [end-threshold,end). Then, the
|
||||
* last file becomes an aggregate of the previous compactions.
|
||||
*
|
||||
* normal skew:
|
||||
*
|
||||
* older ----> newer (increasing seqID)
|
||||
* _
|
||||
* | | _
|
||||
* | | | | _
|
||||
* --|-|- |-|- |-|---_-------_------- minCompactSize
|
||||
* | | | | | | | | _ | |
|
||||
* | | | | | | | | | | | |
|
||||
* | | | | | | | | | | | |
|
||||
*/
|
||||
CompactSelection applyCompactionPolicy(CompactSelection candidates) throws IOException {
|
||||
if (candidates.getFilesToCompact().isEmpty()) {
|
||||
return candidates;
|
||||
}
|
||||
|
||||
// we're doing a minor compaction, let's see what files are applicable
|
||||
int start = 0;
|
||||
double ratio = comConf.getCompactionRatio();
|
||||
if (isOffPeakHour() && candidates.trySetOffpeak()) {
|
||||
ratio = comConf.getCompactionRatioOffPeak();
|
||||
LOG.info("Running an off-peak compaction, selection ratio = " + ratio
|
||||
+ ", numOutstandingOffPeakCompactions is now "
|
||||
+ CompactSelection.getNumOutStandingOffPeakCompactions());
|
||||
}
|
||||
|
||||
// get store file sizes for incremental compacting selection.
|
||||
int countOfFiles = candidates.getFilesToCompact().size();
|
||||
long[] fileSizes = new long[countOfFiles];
|
||||
long[] sumSize = new long[countOfFiles];
|
||||
for (int i = countOfFiles - 1; i >= 0; --i) {
|
||||
StoreFile file = candidates.getFilesToCompact().get(i);
|
||||
fileSizes[i] = file.getReader().length();
|
||||
// calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
|
||||
int tooFar = i + comConf.getMaxFilesToCompact() - 1;
|
||||
sumSize[i] = fileSizes[i]
|
||||
+ ((i + 1 < countOfFiles) ? sumSize[i + 1] : 0)
|
||||
- ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
|
||||
}
|
||||
|
||||
|
||||
while (countOfFiles - start >= comConf.getMinFilesToCompact() &&
|
||||
fileSizes[start] > Math.max(comConf.getMinCompactSize(),
|
||||
(long) (sumSize[start + 1] * ratio))) {
|
||||
++start;
|
||||
}
|
||||
if (start < countOfFiles) {
|
||||
LOG.info("Default compaction algorithm has selected " + (countOfFiles - start)
|
||||
+ " files from " + countOfFiles + " candidates");
|
||||
}
|
||||
|
||||
candidates = candidates.getSubList(start, countOfFiles);
|
||||
|
||||
return candidates;
|
||||
}
|
||||
|
||||
/*
|
||||
* @param filesToCompact Files to compact. Can be null.
|
||||
* @return True if we should run a major compaction.
|
||||
*/
|
||||
public boolean isMajorCompaction(final List<StoreFile> filesToCompact)
|
||||
throws IOException {
|
||||
boolean result = false;
|
||||
long mcTime = getNextMajorCompactTime(filesToCompact);
|
||||
if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) {
|
||||
return result;
|
||||
}
|
||||
// TODO: Use better method for determining stamp of last major (HBASE-2990)
|
||||
long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact);
|
||||
long now = System.currentTimeMillis();
|
||||
if (lowTimestamp > 0l && lowTimestamp < (now - mcTime)) {
|
||||
// Major compaction time has elapsed.
|
||||
long cfTtl = this.storeConfig.getStoreFileTtl();
|
||||
if (filesToCompact.size() == 1) {
|
||||
// Single file
|
||||
StoreFile sf = filesToCompact.get(0);
|
||||
Long minTimestamp = sf.getMinimumTimestamp();
|
||||
long oldest = (minTimestamp == null)
|
||||
? Long.MIN_VALUE
|
||||
: now - minTimestamp.longValue();
|
||||
if (sf.isMajorCompaction() &&
|
||||
(cfTtl == HConstants.FOREVER || oldest < cfTtl)) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Skipping major compaction of " + this +
|
||||
" because one (major) compacted file only and oldestTime " +
|
||||
oldest + "ms is < ttl=" + cfTtl);
|
||||
}
|
||||
} else if (cfTtl != HConstants.FOREVER && oldest > cfTtl) {
|
||||
LOG.debug("Major compaction triggered on store " + this +
|
||||
", because keyvalues outdated; time since last major compaction " +
|
||||
(now - lowTimestamp) + "ms");
|
||||
result = true;
|
||||
}
|
||||
} else {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Major compaction triggered on store " + this +
|
||||
"; time since last major compaction " + (now - lowTimestamp) + "ms");
|
||||
}
|
||||
result = true;
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
public long getNextMajorCompactTime(final List<StoreFile> filesToCompact) {
|
||||
// default = 24hrs
|
||||
long ret = comConf.getMajorCompactionPeriod();
|
||||
if (ret > 0) {
|
||||
// default = 20% = +/- 4.8 hrs
|
||||
double jitterPct = comConf.getMajorCompactionJitter();
|
||||
if (jitterPct > 0) {
|
||||
long jitter = Math.round(ret * jitterPct);
|
||||
// deterministic jitter avoids a major compaction storm on restart
|
||||
Integer seed = StoreUtils.getDeterministicRandomSeed(filesToCompact);
|
||||
if (seed != null) {
|
||||
double rnd = (new Random(seed)).nextDouble();
|
||||
ret += jitter - Math.round(2L * jitter * rnd);
|
||||
} else {
|
||||
ret = 0; // no storefiles == no major compaction
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param compactionSize Total size of some compaction
|
||||
* @return whether this should be a large or small compaction
|
||||
*/
|
||||
public boolean throttleCompaction(long compactionSize) {
|
||||
return compactionSize > comConf.getThrottlePoint();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param numCandidates Number of candidate store files
|
||||
* @return whether a compactionSelection is possible
|
||||
*/
|
||||
public boolean needsCompaction(int numCandidates) {
|
||||
return numCandidates > comConf.getMinFilesToCompact();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return whether this is off-peak hour
|
||||
*/
|
||||
private boolean isOffPeakHour() {
|
||||
int currentHour = calendar.get(Calendar.HOUR_OF_DAY);
|
||||
int startHour = comConf.getOffPeakStartHour();
|
||||
int endHour = comConf.getOffPeakEndHour();
|
||||
// If offpeak time checking is disabled just return false.
|
||||
if (startHour == endHour) {
|
||||
return false;
|
||||
}
|
||||
if (startHour < endHour) {
|
||||
return (currentHour >= startHour && currentHour < endHour);
|
||||
}
|
||||
return (currentHour >= startHour || currentHour < endHour);
|
||||
}
|
||||
}
|
|
@ -208,6 +208,10 @@ public class CompactionRequest implements Comparable<CompactionRequest>,
|
|||
return p;
|
||||
}
|
||||
|
||||
public long getSelectionTime() {
|
||||
return compactSelection.getSelectionTime();
|
||||
}
|
||||
|
||||
/** Gets the priority for the request */
|
||||
public void setPriority(int p) {
|
||||
this.p = p;
|
||||
|
@ -271,7 +275,7 @@ public class CompactionRequest implements Comparable<CompactionRequest>,
|
|||
server.checkFileSystem();
|
||||
} finally {
|
||||
s.finishRequest(this);
|
||||
LOG.debug("CompactSplitThread status: " + server.compactSplitThread);
|
||||
LOG.debug("CompactSplitThread Status: " + server.compactSplitThread);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -49,8 +49,7 @@ import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
|||
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.*;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
@ -302,6 +301,7 @@ public class TestCompaction extends HBaseTestCase {
|
|||
conf.setFloat("hbase.hregion.majorcompaction.jitter", jitterPct);
|
||||
|
||||
HStore s = ((HStore) r.getStore(COLUMN_FAMILY));
|
||||
s.compactionPolicy.updateConfiguration(conf, s);
|
||||
try {
|
||||
createStoreFile(r);
|
||||
createStoreFile(r);
|
||||
|
@ -313,9 +313,11 @@ public class TestCompaction extends HBaseTestCase {
|
|||
assertEquals(2, s.getStorefilesCount());
|
||||
|
||||
// ensure that major compaction time is deterministic
|
||||
long mcTime = s.getNextMajorCompactTime();
|
||||
CompactionPolicy c = s.compactionPolicy;
|
||||
List<StoreFile> storeFiles = s.getStorefiles();
|
||||
long mcTime = c.getNextMajorCompactTime(storeFiles);
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
assertEquals(mcTime, s.getNextMajorCompactTime());
|
||||
assertEquals(mcTime, c.getNextMajorCompactTime(storeFiles));
|
||||
}
|
||||
|
||||
// ensure that the major compaction time is within the variance
|
||||
|
|
|
@ -0,0 +1,321 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Calendar;
|
||||
import java.util.GregorianCalendar;
|
||||
import java.util.List;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.*;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.*;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
@Category(SmallTests.class)
|
||||
public class TestDefaultCompactSelection extends TestCase {
|
||||
private final static Log LOG = LogFactory.getLog(TestDefaultCompactSelection.class);
|
||||
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
|
||||
protected Configuration conf;
|
||||
protected HStore store;
|
||||
private static final String DIR=
|
||||
TEST_UTIL.getDataTestDir(TestDefaultCompactSelection.class.getSimpleName()).toString();
|
||||
private static Path TEST_FILE;
|
||||
private CompactionPolicy manager;
|
||||
|
||||
protected static final int minFiles = 3;
|
||||
protected static final int maxFiles = 5;
|
||||
|
||||
protected static final long minSize = 10;
|
||||
protected static final long maxSize = 1000;
|
||||
|
||||
|
||||
@Override
|
||||
public void setUp() throws Exception {
|
||||
// setup config values necessary for store
|
||||
this.conf = TEST_UTIL.getConfiguration();
|
||||
this.conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 0);
|
||||
this.conf.setInt("hbase.hstore.compaction.min", minFiles);
|
||||
this.conf.setInt("hbase.hstore.compaction.max", maxFiles);
|
||||
this.conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, minSize);
|
||||
this.conf.setLong("hbase.hstore.compaction.max.size", maxSize);
|
||||
this.conf.setFloat("hbase.hstore.compaction.ratio", 1.0F);
|
||||
|
||||
//Setting up a Store
|
||||
Path basedir = new Path(DIR);
|
||||
String logName = "logs";
|
||||
Path logdir = new Path(DIR, logName);
|
||||
Path oldLogDir = new Path(basedir, HConstants.HREGION_OLDLOGDIR_NAME);
|
||||
HColumnDescriptor hcd = new HColumnDescriptor(Bytes.toBytes("family"));
|
||||
FileSystem fs = FileSystem.get(conf);
|
||||
|
||||
fs.delete(logdir, true);
|
||||
|
||||
HTableDescriptor htd = new HTableDescriptor(Bytes.toBytes("table"));
|
||||
htd.addFamily(hcd);
|
||||
HRegionInfo info = new HRegionInfo(htd.getName(), null, null, false);
|
||||
|
||||
HLog hlog = HLogFactory.createHLog(fs, basedir,
|
||||
logName, conf);
|
||||
HRegion region = HRegion.createHRegion(info, basedir, conf, htd);
|
||||
HRegion.closeHRegion(region);
|
||||
Path tableDir = new Path(basedir, Bytes.toString(htd.getName()));
|
||||
region = new HRegion(tableDir, hlog, fs, conf, info, htd, null);
|
||||
|
||||
store = new HStore(basedir, region, hcd, fs, conf);
|
||||
manager = store.compactionPolicy;
|
||||
|
||||
TEST_FILE = StoreFile.getRandomFilename(fs, store.getHomedir());
|
||||
fs.create(TEST_FILE);
|
||||
}
|
||||
|
||||
// used so our tests don't deal with actual StoreFiles
|
||||
static class MockStoreFile extends StoreFile {
|
||||
long length = 0;
|
||||
boolean isRef = false;
|
||||
long ageInDisk;
|
||||
long sequenceid;
|
||||
|
||||
MockStoreFile(long length, long ageInDisk, boolean isRef, long sequenceid) throws IOException {
|
||||
super(TEST_UTIL.getTestFileSystem(), TEST_FILE, TEST_UTIL.getConfiguration(),
|
||||
new CacheConfig(TEST_UTIL.getConfiguration()), BloomType.NONE,
|
||||
NoOpDataBlockEncoder.INSTANCE);
|
||||
this.length = length;
|
||||
this.isRef = isRef;
|
||||
this.ageInDisk = ageInDisk;
|
||||
this.sequenceid = sequenceid;
|
||||
}
|
||||
|
||||
void setLength(long newLen) {
|
||||
this.length = newLen;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMaxSequenceId() {
|
||||
return sequenceid;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isMajorCompaction() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isReference() {
|
||||
return this.isRef;
|
||||
}
|
||||
|
||||
@Override
|
||||
public StoreFile.Reader getReader() {
|
||||
final long len = this.length;
|
||||
return new StoreFile.Reader() {
|
||||
@Override
|
||||
public long length() {
|
||||
return len;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
ArrayList<Long> toArrayList(long... numbers) {
|
||||
ArrayList<Long> result = new ArrayList<Long>();
|
||||
for (long i : numbers) {
|
||||
result.add(i);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
List<StoreFile> sfCreate(long... sizes) throws IOException {
|
||||
ArrayList<Long> ageInDisk = new ArrayList<Long>();
|
||||
for (int i = 0; i < sizes.length; i++) {
|
||||
ageInDisk.add(0L);
|
||||
}
|
||||
return sfCreate(toArrayList(sizes), ageInDisk);
|
||||
}
|
||||
|
||||
List<StoreFile> sfCreate(ArrayList<Long> sizes, ArrayList<Long> ageInDisk)
|
||||
throws IOException {
|
||||
return sfCreate(false, sizes, ageInDisk);
|
||||
}
|
||||
|
||||
List<StoreFile> sfCreate(boolean isReference, long... sizes) throws IOException {
|
||||
ArrayList<Long> ageInDisk = new ArrayList<Long>(sizes.length);
|
||||
for (int i = 0; i < sizes.length; i++) {
|
||||
ageInDisk.add(0L);
|
||||
}
|
||||
return sfCreate(isReference, toArrayList(sizes), ageInDisk);
|
||||
}
|
||||
|
||||
List<StoreFile> sfCreate(boolean isReference, ArrayList<Long> sizes, ArrayList<Long> ageInDisk)
|
||||
throws IOException {
|
||||
List<StoreFile> ret = Lists.newArrayList();
|
||||
for (int i = 0; i < sizes.size(); i++) {
|
||||
ret.add(new MockStoreFile(sizes.get(i), ageInDisk.get(i), isReference, i));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
long[] getSizes(List<StoreFile> sfList) {
|
||||
long[] aNums = new long[sfList.size()];
|
||||
for (int i = 0; i < sfList.size(); ++i) {
|
||||
aNums[i] = sfList.get(i).getReader().length();
|
||||
}
|
||||
return aNums;
|
||||
}
|
||||
|
||||
void compactEquals(List<StoreFile> candidates, long... expected)
|
||||
throws IOException {
|
||||
compactEquals(candidates, false, expected);
|
||||
}
|
||||
|
||||
void compactEquals(List<StoreFile> candidates, boolean forcemajor,
|
||||
long ... expected)
|
||||
throws IOException {
|
||||
store.forceMajor = forcemajor;
|
||||
//Test Default compactions
|
||||
List<StoreFile> actual = store.compactionPolicy
|
||||
.selectCompaction(candidates, false, forcemajor).getFilesToCompact();
|
||||
assertEquals(Arrays.toString(expected), Arrays.toString(getSizes(actual)));
|
||||
store.forceMajor = false;
|
||||
}
|
||||
|
||||
public void testCompactionRatio() throws IOException {
|
||||
/**
|
||||
* NOTE: these tests are specific to describe the implementation of the
|
||||
* current compaction algorithm. Developed to ensure that refactoring
|
||||
* doesn't implicitly alter this.
|
||||
*/
|
||||
long tooBig = maxSize + 1;
|
||||
|
||||
// default case. preserve user ratio on size
|
||||
compactEquals(sfCreate(100,50,23,12,12), 23, 12, 12);
|
||||
// less than compact threshold = don't compact
|
||||
compactEquals(sfCreate(100,50,25,12,12) /* empty */);
|
||||
// greater than compact size = skip those
|
||||
compactEquals(sfCreate(tooBig, tooBig, 700, 700, 700), 700, 700, 700);
|
||||
// big size + threshold
|
||||
compactEquals(sfCreate(tooBig, tooBig, 700,700) /* empty */);
|
||||
// small files = don't care about ratio
|
||||
compactEquals(sfCreate(8,3,1), 8,3,1);
|
||||
/* TODO: add sorting + unit test back in when HBASE-2856 is fixed
|
||||
// sort first so you don't include huge file the tail end.
|
||||
// happens with HFileOutputFormat bulk migration
|
||||
compactEquals(sfCreate(100,50,23,12,12, 500), 23, 12, 12);
|
||||
*/
|
||||
// don't exceed max file compact threshold
|
||||
// note: file selection starts with largest to smallest.
|
||||
compactEquals(sfCreate(7, 6, 5, 4, 3, 2, 1), 7, 6, 5, 4, 3);
|
||||
|
||||
/* MAJOR COMPACTION */
|
||||
// if a major compaction has been forced, then compact everything
|
||||
compactEquals(sfCreate(50,25,12,12), true, 50, 25, 12, 12);
|
||||
// also choose files < threshold on major compaction
|
||||
compactEquals(sfCreate(12,12), true, 12, 12);
|
||||
// even if one of those files is too big
|
||||
compactEquals(sfCreate(tooBig, 12,12), true, tooBig, 12, 12);
|
||||
// don't exceed max file compact threshold, even with major compaction
|
||||
store.forceMajor = true;
|
||||
compactEquals(sfCreate(7, 6, 5, 4, 3, 2, 1), 7, 6, 5, 4, 3);
|
||||
store.forceMajor = false;
|
||||
// if we exceed maxCompactSize, downgrade to minor
|
||||
// if not, it creates a 'snowball effect' when files >> maxCompactSize:
|
||||
// the last file in compaction is the aggregate of all previous compactions
|
||||
compactEquals(sfCreate(100,50,23,12,12), true, 23, 12, 12);
|
||||
conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 1);
|
||||
conf.setFloat("hbase.hregion.majorcompaction.jitter", 0);
|
||||
store.compactionPolicy.updateConfiguration(conf, store);
|
||||
try {
|
||||
// trigger an aged major compaction
|
||||
compactEquals(sfCreate(50,25,12,12), 50, 25, 12, 12);
|
||||
// major sure exceeding maxCompactSize also downgrades aged minors
|
||||
compactEquals(sfCreate(100,50,23,12,12), 23, 12, 12);
|
||||
} finally {
|
||||
conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24);
|
||||
conf.setFloat("hbase.hregion.majorcompaction.jitter", 0.20F);
|
||||
}
|
||||
|
||||
/* REFERENCES == file is from a region that was split */
|
||||
// treat storefiles that have references like a major compaction
|
||||
compactEquals(sfCreate(true, 100,50,25,12,12), 100, 50, 25, 12, 12);
|
||||
// reference files shouldn't obey max threshold
|
||||
compactEquals(sfCreate(true, tooBig, 12,12), tooBig, 12, 12);
|
||||
// reference files should obey max file compact to avoid OOM
|
||||
compactEquals(sfCreate(true, 7, 6, 5, 4, 3, 2, 1), 7, 6, 5, 4, 3);
|
||||
|
||||
// empty case
|
||||
compactEquals(new ArrayList<StoreFile>() /* empty */);
|
||||
// empty case (because all files are too big)
|
||||
compactEquals(sfCreate(tooBig, tooBig) /* empty */);
|
||||
}
|
||||
|
||||
public void testOffPeakCompactionRatio() throws IOException {
|
||||
/*
|
||||
* NOTE: these tests are specific to describe the implementation of the
|
||||
* current compaction algorithm. Developed to ensure that refactoring
|
||||
* doesn't implicitly alter this.
|
||||
*/
|
||||
long tooBig = maxSize + 1;
|
||||
|
||||
Calendar calendar = new GregorianCalendar();
|
||||
int hourOfDay = calendar.get(Calendar.HOUR_OF_DAY);
|
||||
LOG.debug("Hour of day = " + hourOfDay);
|
||||
int hourPlusOne = ((hourOfDay+1)%24);
|
||||
int hourMinusOne = ((hourOfDay-1+24)%24);
|
||||
int hourMinusTwo = ((hourOfDay-2+24)%24);
|
||||
|
||||
// check compact selection without peak hour setting
|
||||
LOG.debug("Testing compact selection without off-peak settings...");
|
||||
compactEquals(sfCreate(999,50,12,12,1), 12, 12, 1);
|
||||
|
||||
// set an off-peak compaction threshold
|
||||
this.conf.setFloat("hbase.hstore.compaction.ratio.offpeak", 5.0F);
|
||||
|
||||
// set peak hour to current time and check compact selection
|
||||
this.conf.setLong("hbase.offpeak.start.hour", hourMinusOne);
|
||||
this.conf.setLong("hbase.offpeak.end.hour", hourPlusOne);
|
||||
LOG.debug("Testing compact selection with off-peak settings (" +
|
||||
hourMinusOne + ", " + hourPlusOne + ")");
|
||||
store.compactionPolicy.updateConfiguration(this.conf, store);
|
||||
compactEquals(sfCreate(999, 50, 12, 12, 1), 50, 12, 12, 1);
|
||||
|
||||
// set peak hour outside current selection and check compact selection
|
||||
this.conf.setLong("hbase.offpeak.start.hour", hourMinusTwo);
|
||||
this.conf.setLong("hbase.offpeak.end.hour", hourMinusOne);
|
||||
store.compactionPolicy.updateConfiguration(this.conf, store);
|
||||
LOG.debug("Testing compact selection with off-peak settings (" +
|
||||
hourMinusTwo + ", " + hourMinusOne + ")");
|
||||
compactEquals(sfCreate(999,50,12,12, 1), 12, 12, 1);
|
||||
}
|
||||
}
|
|
@ -248,17 +248,15 @@ public class TestStore extends TestCase {
|
|||
flush(i);
|
||||
}
|
||||
// after flush; check the lowest time stamp
|
||||
long lowestTimeStampFromStore =
|
||||
HStore.getLowestTimestamp(store.getStorefiles());
|
||||
long lowestTimeStampFromFS =
|
||||
getLowestTimeStampFromFS(fs,store.getStorefiles());
|
||||
assertEquals(lowestTimeStampFromStore,lowestTimeStampFromFS);
|
||||
|
||||
long lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
|
||||
long lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
|
||||
assertEquals(lowestTimeStampFromManager,lowestTimeStampFromFS);
|
||||
|
||||
// after compact; check the lowest time stamp
|
||||
store.compact(store.requestCompaction());
|
||||
lowestTimeStampFromStore = HStore.getLowestTimestamp(store.getStorefiles());
|
||||
lowestTimeStampFromFS = getLowestTimeStampFromFS(fs,store.getStorefiles());
|
||||
assertEquals(lowestTimeStampFromStore,lowestTimeStampFromFS);
|
||||
lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
|
||||
lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
|
||||
assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS);
|
||||
}
|
||||
|
||||
private static long getLowestTimeStampFromFS(FileSystem fs,
|
||||
|
|
Loading…
Reference in New Issue