HBASE-7110 refactor the compaction selection and config code similarly to 0.89-fb changes

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1413912 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2012-11-26 22:51:37 +00:00
parent 6351fbaa34
commit 3d759b79c8
12 changed files with 948 additions and 489 deletions

View File

@ -4172,7 +4172,7 @@ public class HRegion implements HeapSize { // , Writable{
}
/**
* @return True if needs a mojor compaction.
* @return True if needs a major compaction.
* @throws IOException
*/
boolean isMajorCompaction() throws IOException {

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -63,9 +64,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.io.hfile.InvalidHFileException;
import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.ClassSize;
@ -75,8 +74,6 @@ import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.util.StringUtils;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
@ -103,8 +100,9 @@ import com.google.common.collect.Lists;
* <p>Locking and transactions are handled at a higher level. This API should
* not be called directly but by an HRegion manager.
*/
//TODO: move StoreConfiguration implementation into a separate class.
@InterfaceAudience.Private
public class HStore implements Store {
public class HStore implements Store, StoreConfiguration {
static final Log LOG = LogFactory.getLog(HStore.class);
protected final MemStore memstore;
@ -112,15 +110,12 @@ public class HStore implements Store {
private final Path homedir;
private final HRegion region;
private final HColumnDescriptor family;
CompactionPolicy compactionPolicy;
final FileSystem fs;
final Configuration conf;
final CacheConfig cacheConf;
// ttl in milliseconds.
// ttl in milliseconds. TODO: can this be removed? Already stored in scanInfo.
private long ttl;
private final int minFilesToCompact;
private final int maxFilesToCompact;
private final long minCompactSize;
private final long maxCompactSize;
private long lastCompactSize = 0;
volatile boolean forceMajor = false;
/* how many bytes to write between status checks */
@ -193,7 +188,7 @@ public class HStore implements Store {
this.comparator = info.getComparator();
// Get TTL
this.ttl = getTTL(family);
this.ttl = determineTTLFromFamily(family);
// used by ScanQueryMatcher
long timeToPurgeDeletes =
Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
@ -204,23 +199,11 @@ public class HStore implements Store {
scanInfo = new ScanInfo(family, ttl, timeToPurgeDeletes, this.comparator);
this.memstore = new MemStore(conf, this.comparator);
// By default, compact if storefile.count >= minFilesToCompact
this.minFilesToCompact = Math.max(2,
conf.getInt("hbase.hstore.compaction.min",
/*old name*/ conf.getInt("hbase.hstore.compactionThreshold", 3)));
LOG.info("hbase.hstore.compaction.min = " + this.minFilesToCompact);
// Setting up cache configuration for this family
this.cacheConf = new CacheConfig(conf, family);
this.blockingStoreFileCount =
conf.getInt("hbase.hstore.blockingStoreFiles", 7);
this.maxFilesToCompact = conf.getInt("hbase.hstore.compaction.max", 10);
this.minCompactSize = conf.getLong("hbase.hstore.compaction.min.size",
this.region.memstoreFlushSize);
this.maxCompactSize
= conf.getLong("hbase.hstore.compaction.max.size", Long.MAX_VALUE);
this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false);
if (HStore.closeCheckInterval == 0) {
@ -234,14 +217,16 @@ public class HStore implements Store {
// initilize bytes per checksum
this.bytesPerChecksum = getBytesPerChecksum(conf);
// Create a compaction tool instance
this.compactor = new Compactor(this.conf);
this.compactor = new Compactor(conf);
// Create a compaction manager.
this.compactionPolicy = new CompactionPolicy(conf, this);
}
/**
* @param family
* @return
*/
long getTTL(final HColumnDescriptor family) {
private static long determineTTLFromFamily(final HColumnDescriptor family) {
// HCD.getTimeToLive returns ttl in seconds. Convert to milliseconds.
long ttl = family.getTimeToLive();
if (ttl == HConstants.FOREVER) {
@ -285,6 +270,22 @@ public class HStore implements Store {
return this.fs;
}
/* Implementation of StoreConfiguration */
public long getStoreFileTtl() {
// TTL only applies if there's no MIN_VERSIONs setting on the column.
return (this.scanInfo.getMinVersions() == 0) ? this.ttl : Long.MAX_VALUE;
}
public Long getMajorCompactionPeriod() {
String strCompactionTime = this.family.getValue(HConstants.MAJOR_COMPACTION_PERIOD);
return (strCompactionTime != null) ? new Long(strCompactionTime) : null;
}
public long getMemstoreFlushSize() {
return this.region.memstoreFlushSize;
}
/* End implementation of StoreConfiguration */
/**
* Returns the configured bytesPerChecksum value.
* @param conf The configuration
@ -352,7 +353,8 @@ public class HStore implements Store {
* @param family family name of this store
* @return Path to the family/Store home directory
*/
public static Path getStoreHomedir(final Path parentRegionDirectory, final byte[] family) {
public static Path getStoreHomedir(final Path parentRegionDirectory,
final byte[] family) {
return new Path(parentRegionDirectory, new Path(Bytes.toString(family)));
}
/**
@ -566,7 +568,8 @@ public class HStore implements Store {
"the destination store. Copying file over to destination filesystem.");
Path tmpPath = getTmpPath();
FileUtil.copy(srcFs, srcPath, fs, tmpPath, false, conf);
LOG.info("Copied " + srcPath + " to temporary path on destination filesystem: " + tmpPath);
LOG.info("Copied " + srcPath
+ " to temporary path on destination filesystem: " + tmpPath);
srcPath = tmpPath;
}
@ -663,8 +666,8 @@ public class HStore implements Store {
/**
* Snapshot this stores memstore. Call before running
* {@link #flushCache(long, SortedSet, TimeRangeTracker, AtomicLong, MonitoredTask)} so it has
* some work to do.
* {@link #flushCache(long, SortedSet, TimeRangeTracker, AtomicLong, MonitoredTask)}
* so it has some work to do.
*/
void snapshot() {
this.memstore.snapshot();
@ -722,7 +725,8 @@ public class HStore implements Store {
InternalScanner scanner = null;
KeyValueScanner memstoreScanner = new CollectionBackedScanner(set, this.comparator);
if (getHRegion().getCoprocessorHost() != null) {
scanner = getHRegion().getCoprocessorHost().preFlushScannerOpen(this, memstoreScanner);
scanner = getHRegion().getCoprocessorHost()
.preFlushScannerOpen(this, memstoreScanner);
}
if (scanner == null) {
Scan scan = new Scan();
@ -759,7 +763,8 @@ public class HStore implements Store {
if (!kvs.isEmpty()) {
for (KeyValue kv : kvs) {
// If we know that this KV is going to be included always, then let us
// set its memstoreTS to 0. This will help us save space when writing to disk.
// set its memstoreTS to 0. This will help us save space when writing to
// disk.
if (kv.getMemstoreTS() <= smallestReadPoint) {
// let us not change the original KV. It could be in the memstore
// changing its memstoreTS could affect other threads/scanners.
@ -774,7 +779,8 @@ public class HStore implements Store {
} while (hasMore);
} finally {
// Write out the log sequence number that corresponds to this output
// hfile. The hfile is current up to and including logCacheFlushId.
// hfile. Also write current time in metadata as minFlushTime.
// The hfile is current up to and including logCacheFlushId.
status.setStatus("Flushing " + this + ": appending metadata");
writer.appendMetadata(logCacheFlushId, false);
status.setStatus("Flushing " + this + ": closing flushed file");
@ -1004,12 +1010,12 @@ public class HStore implements Store {
// Ready to go. Have list of files to compact.
LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
+ this + " of "
+ this.region.getRegionInfo().getRegionNameAsString()
+ this + " of " + this.region.getRegionInfo().getRegionNameAsString()
+ " into tmpdir=" + region.getTmpDir() + ", seqid=" + maxId + ", totalSize="
+ StringUtils.humanReadableInt(cr.getSize()));
StoreFile sf = null;
long compactionStartTime = EnvironmentEdgeManager.currentTimeMillis();
try {
StoreFile.Writer writer =
this.compactor.compact(this, filesToCompact, cr.isMajor(), maxId);
@ -1031,6 +1037,7 @@ public class HStore implements Store {
}
}
long now = EnvironmentEdgeManager.currentTimeMillis();
LOG.info("Completed" + (cr.isMajor() ? " major " : " ") + "compaction of "
+ filesToCompact.size() + " file(s) in " + this + " of "
+ this.region.getRegionInfo().getRegionNameAsString()
@ -1038,8 +1045,11 @@ public class HStore implements Store {
(sf == null ? "none" : sf.getPath().getName()) +
", size=" + (sf == null ? "none" :
StringUtils.humanReadableInt(sf.getReader().length()))
+ "; total size for store is "
+ StringUtils.humanReadableInt(storeSize));
+ "; total size for store is " + StringUtils.humanReadableInt(storeSize)
+ ". This selection was in queue for "
+ StringUtils.formatTimeDiff(compactionStartTime, cr.getSelectionTime())
+ ", and took " + StringUtils.formatTimeDiff(now, compactionStartTime)
+ " to execute.");
return sf;
}
@ -1094,38 +1104,7 @@ public class HStore implements Store {
@Override
public boolean hasReferences() {
return hasReferences(this.storefiles);
}
/*
* @param files
* @return True if any of the files in <code>files</code> are References.
*/
private boolean hasReferences(Collection<StoreFile> files) {
if (files != null && files.size() > 0) {
for (StoreFile hsf: files) {
if (hsf.isReference()) {
return true;
}
}
}
return false;
}
/*
* Gets lowest timestamp from candidate StoreFiles
*
* @param fs
* @param dir
* @throws IOException
*/
public static long getLowestTimestamp(final List<StoreFile> candidates)
throws IOException {
long minTs = Long.MAX_VALUE;
for (StoreFile storeFile : candidates) {
minTs = Math.min(minTs, storeFile.getModificationTimeStamp());
}
return minTs;
return StoreUtils.hasReferences(this.storefiles);
}
@Override
@ -1143,91 +1122,7 @@ public class HStore implements Store {
}
List<StoreFile> candidates = new ArrayList<StoreFile>(this.storefiles);
// exclude files above the max compaction threshold
// except: save all references. we MUST compact them
int pos = 0;
while (pos < candidates.size() &&
candidates.get(pos).getReader().length() > this.maxCompactSize &&
!candidates.get(pos).isReference()) ++pos;
candidates.subList(0, pos).clear();
return isMajorCompaction(candidates);
}
/*
* @param filesToCompact Files to compact. Can be null.
* @return True if we should run a major compaction.
*/
private boolean isMajorCompaction(final List<StoreFile> filesToCompact) throws IOException {
boolean result = false;
long mcTime = getNextMajorCompactTime();
if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) {
return result;
}
// TODO: Use better method for determining stamp of last major (HBASE-2990)
long lowTimestamp = getLowestTimestamp(filesToCompact);
long now = System.currentTimeMillis();
if (lowTimestamp > 0l && lowTimestamp < (now - mcTime)) {
// Major compaction time has elapsed.
if (filesToCompact.size() == 1) {
// Single file
StoreFile sf = filesToCompact.get(0);
long oldest =
(sf.getReader().timeRangeTracker == null) ?
Long.MIN_VALUE :
now - sf.getReader().timeRangeTracker.minimumTimestamp;
if (sf.isMajorCompaction() &&
(this.ttl == HConstants.FOREVER || oldest < this.ttl)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping major compaction of " + this +
" because one (major) compacted file only and oldestTime " +
oldest + "ms is < ttl=" + this.ttl);
}
} else if (this.ttl != HConstants.FOREVER && oldest > this.ttl) {
LOG.debug("Major compaction triggered on store " + this +
", because keyvalues outdated; time since last major compaction " +
(now - lowTimestamp) + "ms");
result = true;
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Major compaction triggered on store " + this +
"; time since last major compaction " + (now - lowTimestamp) + "ms");
}
result = true;
}
}
return result;
}
long getNextMajorCompactTime() {
// default = 24hrs
long ret = conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24);
if (family.getValue(HConstants.MAJOR_COMPACTION_PERIOD) != null) {
String strCompactionTime =
family.getValue(HConstants.MAJOR_COMPACTION_PERIOD);
ret = (new Long(strCompactionTime)).longValue();
}
if (ret > 0) {
// default = 20% = +/- 4.8 hrs
double jitterPct = conf.getFloat("hbase.hregion.majorcompaction.jitter",
0.20F);
if (jitterPct > 0) {
long jitter = Math.round(ret * jitterPct);
// deterministic jitter avoids a major compaction storm on restart
ImmutableList<StoreFile> snapshot = storefiles;
if (snapshot != null && !snapshot.isEmpty()) {
String seed = snapshot.get(0).getPath().getName();
double curRand = new Random(seed.hashCode()).nextDouble();
ret += jitter - Math.round(2L * jitter * curRand);
} else {
ret = 0; // no storefiles == no major compaction
}
}
}
return ret;
return compactionPolicy.isMajorCompaction(candidates);
}
public CompactionRequest requestCompaction() throws IOException {
@ -1263,9 +1158,11 @@ public class HStore implements Store {
CompactSelection filesToCompact;
if (override) {
// coprocessor is overriding normal file selection
filesToCompact = new CompactSelection(conf, candidates);
filesToCompact = new CompactSelection(candidates);
} else {
filesToCompact = compactSelection(candidates, priority);
boolean isUserCompaction = priority == Store.PRIORITY_USER;
filesToCompact = compactionPolicy.selectCompaction(candidates, isUserCompaction,
forceMajor && filesCompacting.isEmpty());
}
if (region.getCoprocessorHost() != null) {
@ -1288,12 +1185,17 @@ public class HStore implements Store {
Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID);
// major compaction iff all StoreFiles are included
boolean isMajor = (filesToCompact.getFilesToCompact().size() == this.storefiles.size());
boolean isMajor =
(filesToCompact.getFilesToCompact().size() == this.storefiles.size());
if (isMajor) {
// since we're enqueuing a major, update the compaction wait interval
this.forceMajor = false;
}
LOG.debug(getHRegion().regionInfo.getEncodedName() + " - " +
getColumnFamilyName() + ": Initiating " +
(isMajor ? "major" : "minor") + " compaction");
// everything went better than expected. create a compaction request
int pri = getCompactPriority(priority);
ret = new CompactionRequest(region, this, filesToCompact, isMajor, pri);
@ -1315,191 +1217,6 @@ public class HStore implements Store {
}
}
/**
* Algorithm to choose which files to compact, see {@link #compactSelection(java.util.List, int)}
* @param candidates
* @return
* @throws IOException
*/
CompactSelection compactSelection(List<StoreFile> candidates) throws IOException {
return compactSelection(candidates,Store.NO_PRIORITY);
}
/**
* Algorithm to choose which files to compact
*
* Configuration knobs:
* "hbase.hstore.compaction.ratio"
* normal case: minor compact when file <= sum(smaller_files) * ratio
* "hbase.hstore.compaction.min.size"
* unconditionally compact individual files below this size
* "hbase.hstore.compaction.max.size"
* never compact individual files above this size (unless splitting)
* "hbase.hstore.compaction.min"
* min files needed to minor compact
* "hbase.hstore.compaction.max"
* max files to compact at once (avoids OOM)
*
* @param candidates candidate files, ordered from oldest to newest
* @return subset copy of candidate list that meets compaction criteria
* @throws IOException
*/
CompactSelection compactSelection(List<StoreFile> candidates, int priority)
throws IOException {
// ASSUMPTION!!! filesCompacting is locked when calling this function
/* normal skew:
*
* older ----> newer
* _
* | | _
* | | | | _
* --|-|- |-|- |-|---_-------_------- minCompactSize
* | | | | | | | | _ | |
* | | | | | | | | | | | |
* | | | | | | | | | | | |
*/
CompactSelection compactSelection = new CompactSelection(conf, candidates);
boolean forcemajor = this.forceMajor && filesCompacting.isEmpty();
if (!forcemajor) {
// Delete the expired store files before the compaction selection.
if (conf.getBoolean("hbase.store.delete.expired.storefile", true)
&& (ttl != Long.MAX_VALUE) && (this.scanInfo.minVersions == 0)) {
CompactSelection expiredSelection = compactSelection
.selectExpiredStoreFilesToCompact(
EnvironmentEdgeManager.currentTimeMillis() - this.ttl);
// If there is any expired store files, delete them by compaction.
if (expiredSelection != null) {
return expiredSelection;
}
}
// do not compact old files above a configurable threshold
// save all references. we MUST compact them
int pos = 0;
while (pos < compactSelection.getFilesToCompact().size() &&
compactSelection.getFilesToCompact().get(pos).getReader().length()
> maxCompactSize &&
!compactSelection.getFilesToCompact().get(pos).isReference()) ++pos;
if (pos != 0) compactSelection.clearSubList(0, pos);
}
if (compactSelection.getFilesToCompact().isEmpty()) {
LOG.debug(this.getHRegionInfo().getEncodedName() + " - " +
this + ": no store files to compact");
compactSelection.emptyFileList();
return compactSelection;
}
// Force a major compaction if this is a user-requested major compaction,
// or if we do not have too many files to compact and this was requested
// as a major compaction
boolean majorcompaction = (forcemajor && priority == Store.PRIORITY_USER) ||
(forcemajor || isMajorCompaction(compactSelection.getFilesToCompact())) &&
(compactSelection.getFilesToCompact().size() < this.maxFilesToCompact
);
LOG.debug(this.getHRegionInfo().getEncodedName() + " - "
+ this.getColumnFamilyName() + ": Initiating " +
(majorcompaction ? "major" : "minor") + "compaction");
if (!majorcompaction &&
!hasReferences(compactSelection.getFilesToCompact())) {
// we're doing a minor compaction, let's see what files are applicable
int start = 0;
double r = compactSelection.getCompactSelectionRatio();
// remove bulk import files that request to be excluded from minors
compactSelection.getFilesToCompact().removeAll(Collections2.filter(
compactSelection.getFilesToCompact(),
new Predicate<StoreFile>() {
public boolean apply(StoreFile input) {
return input.excludeFromMinorCompaction();
}
}));
// skip selection algorithm if we don't have enough files
if (compactSelection.getFilesToCompact().size() < this.minFilesToCompact) {
if(LOG.isDebugEnabled()) {
LOG.debug("Not compacting files because we only have " +
compactSelection.getFilesToCompact().size() +
" files ready for compaction. Need " + this.minFilesToCompact + " to initiate.");
}
compactSelection.emptyFileList();
return compactSelection;
}
/* TODO: add sorting + unit test back in when HBASE-2856 is fixed
// Sort files by size to correct when normal skew is altered by bulk load.
Collections.sort(filesToCompact, StoreFile.Comparators.FILE_SIZE);
*/
// get store file sizes for incremental compacting selection.
int countOfFiles = compactSelection.getFilesToCompact().size();
long [] fileSizes = new long[countOfFiles];
long [] sumSize = new long[countOfFiles];
for (int i = countOfFiles-1; i >= 0; --i) {
StoreFile file = compactSelection.getFilesToCompact().get(i);
fileSizes[i] = file.getReader().length();
// calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
int tooFar = i + this.maxFilesToCompact - 1;
sumSize[i] = fileSizes[i]
+ ((i+1 < countOfFiles) ? sumSize[i+1] : 0)
- ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
}
/* Start at the oldest file and stop when you find the first file that
* meets compaction criteria:
* (1) a recently-flushed, small file (i.e. <= minCompactSize)
* OR
* (2) within the compactRatio of sum(newer_files)
* Given normal skew, any newer files will also meet this criteria
*
* Additional Note:
* If fileSizes.size() >> maxFilesToCompact, we will recurse on
* compact(). Consider the oldest files first to avoid a
* situation where we always compact [end-threshold,end). Then, the
* last file becomes an aggregate of the previous compactions.
*/
while(countOfFiles - start >= this.minFilesToCompact &&
fileSizes[start] >
Math.max(minCompactSize, (long)(sumSize[start+1] * r))) {
++start;
}
int end = Math.min(countOfFiles, start + this.maxFilesToCompact);
long totalSize = fileSizes[start]
+ ((start+1 < countOfFiles) ? sumSize[start+1] : 0);
compactSelection = compactSelection.getSubList(start, end);
// if we don't have enough files to compact, just wait
if (compactSelection.getFilesToCompact().size() < this.minFilesToCompact) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipped compaction of " + this
+ ". Only " + (end - start) + " file(s) of size "
+ StringUtils.humanReadableInt(totalSize)
+ " have met compaction criteria.");
}
compactSelection.emptyFileList();
return compactSelection;
}
} else {
if(majorcompaction) {
if (compactSelection.getFilesToCompact().size() > this.maxFilesToCompact) {
LOG.debug("Warning, compacting more than " + this.maxFilesToCompact +
" files, probably because of a user-requested major compaction");
if(priority != Store.PRIORITY_USER) {
LOG.error("Compacting more than max files on a non user-requested compaction");
}
}
} else if (compactSelection.getFilesToCompact().size() > this.maxFilesToCompact) {
// all files included in this compaction, up to max
int pastMax = compactSelection.getFilesToCompact().size() - this.maxFilesToCompact;
compactSelection.getFilesToCompact().subList(0, pastMax).clear();
}
}
return compactSelection;
}
/**
* Validates a store file by opening and closing it. In HFileV2 this should
* not be an expensive operation.
@ -1597,8 +1314,8 @@ public class HStore implements Store {
// let the archive util decide if we should archive or delete the files
LOG.debug("Removing store files after compaction...");
HFileArchiver.archiveStoreFiles(this.fs, this.region, this.conf, this.family.getName(),
compactedFiles);
HFileArchiver.archiveStoreFiles(this.fs, this.region, this.conf,
this.family.getName(), compactedFiles);
} catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e);
@ -2005,11 +1722,7 @@ public class HStore implements Store {
@Override
public boolean throttleCompaction(long compactionSize) {
// see HBASE-5867 for discussion on the default
long throttlePoint = conf.getLong(
"hbase.regionserver.thread.compaction.throttle",
2 * this.minFilesToCompact * this.region.memstoreFlushSize);
return compactionSize > throttlePoint;
return compactionPolicy.throttleCompaction(compactionSize);
}
@Override
@ -2115,7 +1828,7 @@ public class HStore implements Store {
@Override
public boolean needsCompaction() {
return (storefiles.size() - filesCompacting.size()) > minFilesToCompact;
return compactionPolicy.needsCompaction(storefiles.size() - filesCompacting.size());
}
@Override
@ -2124,8 +1837,8 @@ public class HStore implements Store {
}
public static final long FIXED_OVERHEAD =
ClassSize.align((19 * ClassSize.REFERENCE) + (6 * Bytes.SIZEOF_LONG)
+ (5 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
ClassSize.align((20 * ClassSize.REFERENCE) + (6 * Bytes.SIZEOF_LONG)
+ (3 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
+ ClassSize.OBJECT + ClassSize.REENTRANT_LOCK

View File

@ -0,0 +1,50 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* The class that contains shared information about various knobs of a Store/HStore object.
* Unlike the configuration objects that merely return the XML values, the implementations
* should return ready-to-use applicable values for corresponding calls, after all the
* parsing/validation/adjustment for other considerations, so that we don't have to repeat
* this logic in multiple places.
* TODO: move methods and logic here as necessary.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public interface StoreConfiguration {
/**
* Gets the cf-specific major compaction period.
*/
public Long getMajorCompactionPeriod();
/**
* Gets the Memstore flush size for the region that this store works with.
*/
public long getMemstoreFlushSize();
/**
* Gets the cf-specific time-to-live for store files.
*/
public long getStoreFileTtl();
}

View File

@ -65,6 +65,7 @@ import org.apache.hadoop.hbase.util.BloomFilter;
import org.apache.hadoop.hbase.util.BloomFilterFactory;
import org.apache.hadoop.hbase.util.BloomFilterWriter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.RawComparator;
@ -295,7 +296,7 @@ public class StoreFile {
* @return True if this is a StoreFile Reference; call after {@link #open()}
* else may get wrong answer.
*/
boolean isReference() {
public boolean isReference() {
return this.reference != null;
}
@ -357,7 +358,7 @@ public class StoreFile {
/**
* @return True if this file was made by a major compaction.
*/
boolean isMajorCompaction() {
public boolean isMajorCompaction() {
if (this.majorCompaction == null) {
throw new NullPointerException("This has not been set yet");
}
@ -367,7 +368,7 @@ public class StoreFile {
/**
* @return True if this file should not be part of a minor compaction.
*/
boolean excludeFromMinorCompaction() {
public boolean excludeFromMinorCompaction() {
return this.excludeFromMinorCompaction;
}
@ -579,7 +580,6 @@ public class StoreFile {
}
}
}
this.reader.setSequenceID(this.sequenceid);
b = metadataMap.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY);
@ -945,6 +945,11 @@ public class StoreFile {
return r.write(fs, p);
}
public Long getMinimumTimestamp() {
return (getReader().timeRangeTracker == null) ?
null :
getReader().timeRangeTracker.minimumTimestamp;
}
/**
* A StoreFile writer. Use this to read/write HBase Store Files. It is package

View File

@ -0,0 +1,64 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
/**
* Utility functions for region server storage layer.
*/
public class StoreUtils {
/**
* Creates a deterministic hash code for store file collection.
*/
public static Integer getDeterministicRandomSeed(final List<StoreFile> files) {
if (files != null && !files.isEmpty()) {
return files.get(0).getPath().getName().hashCode();
}
return null;
}
/**
* Determines whether any files in the collection are references.
*/
public static boolean hasReferences(final Collection<StoreFile> files) {
if (files != null && files.size() > 0) {
for (StoreFile hsf: files) {
if (hsf.isReference()) {
return true;
}
}
}
return false;
}
/**
* Gets lowest timestamp from candidate StoreFiles
*/
public static long getLowestTimestamp(final List<StoreFile> candidates)
throws IOException {
long minTs = Long.MAX_VALUE;
for (StoreFile storeFile : candidates) {
minTs = Math.min(minTs, storeFile.getModificationTimeStamp());
}
return minTs;
}
}

View File

@ -19,15 +19,13 @@
package org.apache.hadoop.hbase.regionserver.compactions;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.GregorianCalendar;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@InterfaceAudience.Private
public class CompactSelection {
@ -48,37 +46,15 @@ public class CompactSelection {
*/
private final static Object compactionCountLock = new Object();
// HBase conf object
Configuration conf;
// was this compaction promoted to an off-peak
boolean isOffPeakCompaction = false;
// compactRatio: double on purpose! Float.MAX < Long.MAX < Double.MAX
// With float, java will downcast your long to float for comparisons (bad)
private double compactRatio;
// compaction ratio off-peak
private double compactRatioOffPeak;
// offpeak start time
private int offPeakStartHour = -1;
// off peak end time
private int offPeakEndHour = -1;
// CompactSelection object creation time.
private final long selectionTime;
public CompactSelection(Configuration conf, List<StoreFile> filesToCompact) {
public CompactSelection(List<StoreFile> filesToCompact) {
this.selectionTime = EnvironmentEdgeManager.currentTimeMillis();
this.filesToCompact = filesToCompact;
this.conf = conf;
this.compactRatio = conf.getFloat("hbase.hstore.compaction.ratio", 1.2F);
this.compactRatioOffPeak = conf.getFloat("hbase.hstore.compaction.ratio.offpeak", 5.0F);
// Peak time is from [offPeakStartHour, offPeakEndHour). Valid numbers are [0, 23]
this.offPeakStartHour = conf.getInt("hbase.offpeak.start.hour", -1);
this.offPeakEndHour = conf.getInt("hbase.offpeak.end.hour", -1);
if (!isValidHour(this.offPeakStartHour) || !isValidHour(this.offPeakEndHour)) {
if (!(this.offPeakStartHour == -1 && this.offPeakEndHour == -1)) {
LOG.warn("Invalid start/end hour for peak hour : start = " +
this.offPeakStartHour + " end = " + this.offPeakEndHour +
". Valid numbers are [0-23]");
}
this.offPeakStartHour = this.offPeakEndHour = -1;
}
this.isOffPeakCompaction = false;
}
/**
@ -113,49 +89,25 @@ public class CompactSelection {
}
if (hasExpiredStoreFiles) {
expiredSFSelection = new CompactSelection(conf, expiredStoreFiles);
expiredSFSelection = new CompactSelection(expiredStoreFiles);
}
return expiredSFSelection;
}
/**
* If the current hour falls in the off peak times and there are no
* outstanding off peak compactions, the current compaction is
* promoted to an off peak compaction. Currently only one off peak
* compaction is present in the compaction queue.
*
* @param currentHour
* @return
*/
public double getCompactSelectionRatio() {
double r = this.compactRatio;
synchronized(compactionCountLock) {
if (isOffPeakHour() && numOutstandingOffPeakCompactions == 0) {
r = this.compactRatioOffPeak;
numOutstandingOffPeakCompactions++;
isOffPeakCompaction = true;
}
}
if(isOffPeakCompaction) {
LOG.info("Running an off-peak compaction, selection ratio = " +
compactRatioOffPeak + ", numOutstandingOffPeakCompactions is now " +
numOutstandingOffPeakCompactions);
}
return r;
}
/**
* The current compaction finished, so reset the off peak compactions count
* if this was an off peak compaction.
*/
public void finishRequest() {
if (isOffPeakCompaction) {
long newValueToLog = -1;
synchronized(compactionCountLock) {
numOutstandingOffPeakCompactions--;
assert !isOffPeakCompaction : "Double-counting off-peak count for compaction";
newValueToLog = --numOutstandingOffPeakCompactions;
isOffPeakCompaction = false;
}
LOG.info("Compaction done, numOutstandingOffPeakCompactions is now " +
numOutstandingOffPeakCompactions);
newValueToLog);
}
}
@ -170,13 +122,14 @@ public class CompactSelection {
public void emptyFileList() {
filesToCompact.clear();
if (isOffPeakCompaction) {
long newValueToLog = -1;
synchronized(compactionCountLock) {
// reset the off peak count
numOutstandingOffPeakCompactions--;
newValueToLog = --numOutstandingOffPeakCompactions;
isOffPeakCompaction = false;
}
LOG.info("Nothing to compact, numOutstandingOffPeakCompactions is now " +
numOutstandingOffPeakCompactions);
newValueToLog);
}
}
@ -184,16 +137,30 @@ public class CompactSelection {
return this.isOffPeakCompaction;
}
private boolean isOffPeakHour() {
int currentHour = (new GregorianCalendar()).get(Calendar.HOUR_OF_DAY);
// If offpeak time checking is disabled just return false.
if (this.offPeakStartHour == this.offPeakEndHour) {
return false;
public static long getNumOutStandingOffPeakCompactions() {
synchronized(compactionCountLock) {
return numOutstandingOffPeakCompactions;
}
if (this.offPeakStartHour < this.offPeakEndHour) {
return (currentHour >= this.offPeakStartHour && currentHour < this.offPeakEndHour);
}
/**
* Tries making the compaction off-peak.
* Only checks internal compaction constraints, not timing.
* @return Eventual value of isOffPeakCompaction.
*/
public boolean trySetOffpeak() {
assert !isOffPeakCompaction : "Double-setting off-peak for compaction " + this;
synchronized(compactionCountLock) {
if (numOutstandingOffPeakCompactions == 0) {
numOutstandingOffPeakCompactions++;
isOffPeakCompaction = true;
}
}
return (currentHour >= this.offPeakStartHour || currentHour < this.offPeakEndHour);
return isOffPeakCompaction;
}
public long getSelectionTime() {
return selectionTime;
}
public CompactSelection subList(int start, int end) {

View File

@ -0,0 +1,214 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.compactions;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.regionserver.StoreConfiguration;
/**
* Compaction configuration for a particular instance of HStore.
* Takes into account both global settings and ones set on the column family/store.
* Control knobs for default compaction algorithm:
* <p/>
* maxCompactSize - upper bound on file size to be included in minor compactions
* minCompactSize - lower bound below which compaction is selected without ratio test
* minFilesToCompact - lower bound on number of files in any minor compaction
* maxFilesToCompact - upper bound on number of files in any minor compaction
* compactionRatio - Ratio used for compaction
* <p/>
* Set parameter as "hbase.hstore.compaction.<attribute>"
*/
//TODO: revisit this class for online parameter updating (both in xml and on the CF)
@InterfaceAudience.Private
public class CompactionConfiguration {
static final Log LOG = LogFactory.getLog(CompactionConfiguration.class);
private static final String CONFIG_PREFIX = "hbase.hstore.compaction.";
Configuration conf;
StoreConfiguration storeConfig;
long maxCompactSize;
long minCompactSize;
int minFilesToCompact;
int maxFilesToCompact;
double compactionRatio;
double offPeekCompactionRatio;
int offPeakStartHour;
int offPeakEndHour;
long throttlePoint;
boolean shouldDeleteExpired;
long majorCompactionPeriod;
float majorCompactionJitter;
CompactionConfiguration(Configuration conf, StoreConfiguration storeConfig) {
this.conf = conf;
this.storeConfig = storeConfig;
maxCompactSize = conf.getLong(CONFIG_PREFIX + "max.size", Long.MAX_VALUE);
minCompactSize = conf.getLong(CONFIG_PREFIX + "min.size",
storeConfig.getMemstoreFlushSize());
minFilesToCompact = Math.max(2, conf.getInt(CONFIG_PREFIX + "min",
/*old name*/ conf.getInt("hbase.hstore.compactionThreshold", 3)));
maxFilesToCompact = conf.getInt(CONFIG_PREFIX + "max", 10);
compactionRatio = conf.getFloat(CONFIG_PREFIX + "ratio", 1.2F);
offPeekCompactionRatio = conf.getFloat(CONFIG_PREFIX + "ratio.offpeak", 5.0F);
offPeakStartHour = conf.getInt("hbase.offpeak.start.hour", -1);
offPeakEndHour = conf.getInt("hbase.offpeak.end.hour", -1);
if (!isValidHour(offPeakStartHour) || !isValidHour(offPeakEndHour)) {
if (!(offPeakStartHour == -1 && offPeakEndHour == -1)) {
LOG.warn("Ignoring invalid start/end hour for peak hour : start = " +
this.offPeakStartHour + " end = " + this.offPeakEndHour +
". Valid numbers are [0-23]");
}
this.offPeakStartHour = this.offPeakEndHour = -1;
}
throttlePoint = conf.getLong("hbase.regionserver.thread.compaction.throttle",
2 * maxFilesToCompact * storeConfig.getMemstoreFlushSize());
shouldDeleteExpired = conf.getBoolean("hbase.store.delete.expired.storefile", true);
majorCompactionPeriod = conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24);
majorCompactionJitter = conf.getFloat("hbase.hregion.majorcompaction.jitter", 0.20F);
LOG.info("Compaction configuration " + this.toString());
}
@Override
public String toString() {
return String.format(
"size [%d, %d); files [%d, %d); ratio %f; off-peak ratio %f; off-peak hours %d-%d; "
+ "throttle point %d;%s delete expired; major period %d, major jitter %f",
minCompactSize,
maxCompactSize,
minFilesToCompact,
maxFilesToCompact,
compactionRatio,
offPeekCompactionRatio,
offPeakStartHour,
offPeakEndHour,
throttlePoint,
shouldDeleteExpired ? "" : " don't",
majorCompactionPeriod,
majorCompactionJitter);
}
/**
* @return lower bound below which compaction is selected without ratio test
*/
long getMinCompactSize() {
return minCompactSize;
}
/**
* @return upper bound on file size to be included in minor compactions
*/
long getMaxCompactSize() {
return maxCompactSize;
}
/**
* @return upper bound on number of files to be included in minor compactions
*/
int getMinFilesToCompact() {
return minFilesToCompact;
}
/**
* @return upper bound on number of files to be included in minor compactions
*/
int getMaxFilesToCompact() {
return maxFilesToCompact;
}
/**
* @return Ratio used for compaction
*/
double getCompactionRatio() {
return compactionRatio;
}
/**
* @return Off peak Ratio used for compaction
*/
double getCompactionRatioOffPeak() {
return offPeekCompactionRatio;
}
/**
* @return Hour at which off-peak compactions start
*/
int getOffPeakStartHour() {
return offPeakStartHour;
}
/**
* @return Hour at which off-peak compactions end
*/
int getOffPeakEndHour() {
return offPeakEndHour;
}
/**
* @return ThrottlePoint used for classifying small and large compactions
*/
long getThrottlePoint() {
return throttlePoint;
}
/**
* @return Major compaction period from compaction.
* Major compactions are selected periodically according to this parameter plus jitter
*/
long getMajorCompactionPeriod() {
if (storeConfig != null) {
Long storeSpecificPeriod = storeConfig.getMajorCompactionPeriod();
if (storeSpecificPeriod != null) {
return storeSpecificPeriod;
}
}
return majorCompactionPeriod;
}
/**
* @return Major the jitter fraction, the fraction within which the major compaction
* period is randomly chosen from the majorCompactionPeriod in each store.
*/
float getMajorCompactionJitter() {
return majorCompactionJitter;
}
/**
* @return Whether expired files should be deleted ASAP using compactions
*/
boolean shouldDeleteExpired() {
return shouldDeleteExpired;
}
private static boolean isValidHour(int hour) {
return (hour >= 0 && hour <= 23);
}
}

View File

@ -0,0 +1,409 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.compactions;
import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.regionserver.StoreConfiguration;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.util.StringUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.GregorianCalendar;
import java.util.List;
import java.util.Random;
/**
* The default (and only, as of now) algorithm for selecting files for compaction.
* Combines the compaction configuration and the provisional file selection that
* it's given to produce the list of suitable candidates for compaction.
*/
@InterfaceAudience.Private
public class CompactionPolicy {
private static final Log LOG = LogFactory.getLog(CompactionPolicy.class);
private final static Calendar calendar = new GregorianCalendar();
CompactionConfiguration comConf;
StoreConfiguration storeConfig;
public CompactionPolicy(Configuration configuration, StoreConfiguration storeConfig) {
updateConfiguration(configuration, storeConfig);
}
/**
* @param candidateFiles candidate files, ordered from oldest to newest
* @return subset copy of candidate list that meets compaction criteria
* @throws java.io.IOException
*/
public CompactSelection selectCompaction(List<StoreFile> candidateFiles,
boolean isUserCompaction, boolean forceMajor)
throws IOException {
// Prelimanry compaction subject to filters
CompactSelection candidateSelection = new CompactSelection(candidateFiles);
long cfTtl = this.storeConfig.getStoreFileTtl();
if (!forceMajor) {
// If there are expired files, only select them so that compaction deletes them
if (comConf.shouldDeleteExpired() && (cfTtl != Long.MAX_VALUE)) {
CompactSelection expiredSelection = selectExpiredStoreFiles(
candidateSelection, EnvironmentEdgeManager.currentTimeMillis() - cfTtl);
if (expiredSelection != null) {
return expiredSelection;
}
}
candidateSelection = skipLargeFiles(candidateSelection);
}
// Force a major compaction if this is a user-requested major compaction,
// or if we do not have too many files to compact and this was requested
// as a major compaction.
// Or, if there are any references among the candidates.
boolean majorCompaction = (
(forceMajor && isUserCompaction)
|| ((forceMajor || isMajorCompaction(candidateSelection.getFilesToCompact()))
&& (candidateSelection.getFilesToCompact().size() < comConf.getMaxFilesToCompact()))
|| StoreUtils.hasReferences(candidateSelection.getFilesToCompact())
);
if (!majorCompaction) {
// we're doing a minor compaction, let's see what files are applicable
candidateSelection = filterBulk(candidateSelection);
candidateSelection = applyCompactionPolicy(candidateSelection);
candidateSelection = checkMinFilesCriteria(candidateSelection);
}
candidateSelection =
removeExcessFiles(candidateSelection, isUserCompaction, majorCompaction);
return candidateSelection;
}
/**
* Updates the compaction configuration. Used for tests.
* TODO: replace when HBASE-3909 is completed in some form.
*/
public void updateConfiguration(Configuration configuration,
StoreConfiguration storeConfig) {
this.comConf = new CompactionConfiguration(configuration, storeConfig);
this.storeConfig = storeConfig;
}
/**
* Select the expired store files to compact
*
* @param candidates the initial set of storeFiles
* @param maxExpiredTimeStamp
* The store file will be marked as expired if its max time stamp is
* less than this maxExpiredTimeStamp.
* @return A CompactSelection contains the expired store files as
* filesToCompact
*/
private CompactSelection selectExpiredStoreFiles(
CompactSelection candidates, long maxExpiredTimeStamp) {
List<StoreFile> filesToCompact = candidates.getFilesToCompact();
if (filesToCompact == null || filesToCompact.size() == 0)
return null;
ArrayList<StoreFile> expiredStoreFiles = null;
boolean hasExpiredStoreFiles = false;
CompactSelection expiredSFSelection = null;
for (StoreFile storeFile : filesToCompact) {
if (storeFile.getReader().getMaxTimestamp() < maxExpiredTimeStamp) {
LOG.info("Deleting the expired store file by compaction: "
+ storeFile.getPath() + " whose maxTimeStamp is "
+ storeFile.getReader().getMaxTimestamp()
+ " while the max expired timestamp is " + maxExpiredTimeStamp);
if (!hasExpiredStoreFiles) {
expiredStoreFiles = new ArrayList<StoreFile>();
hasExpiredStoreFiles = true;
}
expiredStoreFiles.add(storeFile);
}
}
if (hasExpiredStoreFiles) {
expiredSFSelection = new CompactSelection(expiredStoreFiles);
}
return expiredSFSelection;
}
/**
* @param candidates pre-filtrate
* @return filtered subset
* exclude all files above maxCompactSize
* Also save all references. We MUST compact them
*/
private CompactSelection skipLargeFiles(CompactSelection candidates) {
int pos = 0;
while (pos < candidates.getFilesToCompact().size() &&
candidates.getFilesToCompact().get(pos).getReader().length() >
comConf.getMaxCompactSize() &&
!candidates.getFilesToCompact().get(pos).isReference()) {
++pos;
}
if (pos > 0) {
LOG.debug("Some files are too large. Excluding " + pos
+ " files from compaction candidates");
candidates.clearSubList(0, pos);
}
return candidates;
}
/**
* @param candidates pre-filtrate
* @return filtered subset
* exclude all bulk load files if configured
*/
private CompactSelection filterBulk(CompactSelection candidates) {
candidates.getFilesToCompact().removeAll(Collections2.filter(
candidates.getFilesToCompact(),
new Predicate<StoreFile>() {
@Override
public boolean apply(StoreFile input) {
return input.excludeFromMinorCompaction();
}
}));
return candidates;
}
/**
* @param candidates pre-filtrate
* @return filtered subset
* take upto maxFilesToCompact from the start
*/
private CompactSelection removeExcessFiles(CompactSelection candidates,
boolean isUserCompaction, boolean isMajorCompaction) {
int excess = candidates.getFilesToCompact().size() - comConf.getMaxFilesToCompact();
if (excess > 0) {
if (isMajorCompaction && isUserCompaction) {
LOG.debug("Warning, compacting more than " + comConf.getMaxFilesToCompact() +
" files because of a user-requested major compaction");
} else {
LOG.debug("Too many admissible files. Excluding " + excess
+ " files from compaction candidates");
candidates.clearSubList(comConf.getMaxFilesToCompact(),
candidates.getFilesToCompact().size());
}
}
return candidates;
}
/**
* @param candidates pre-filtrate
* @return filtered subset
* forget the compactionSelection if we don't have enough files
*/
private CompactSelection checkMinFilesCriteria(CompactSelection candidates) {
int minFiles = comConf.getMinFilesToCompact();
if (candidates.getFilesToCompact().size() < minFiles) {
if(LOG.isDebugEnabled()) {
LOG.debug("Not compacting files because we only have " +
candidates.getFilesToCompact().size() +
" files ready for compaction. Need " + minFiles + " to initiate.");
}
candidates.emptyFileList();
}
return candidates;
}
/**
* @param candidates pre-filtrate
* @return filtered subset
* -- Default minor compaction selection algorithm:
* choose CompactSelection from candidates --
* First exclude bulk-load files if indicated in configuration.
* Start at the oldest file and stop when you find the first file that
* meets compaction criteria:
* (1) a recently-flushed, small file (i.e. <= minCompactSize)
* OR
* (2) within the compactRatio of sum(newer_files)
* Given normal skew, any newer files will also meet this criteria
* <p/>
* Additional Note:
* If fileSizes.size() >> maxFilesToCompact, we will recurse on
* compact(). Consider the oldest files first to avoid a
* situation where we always compact [end-threshold,end). Then, the
* last file becomes an aggregate of the previous compactions.
*
* normal skew:
*
* older ----> newer (increasing seqID)
* _
* | | _
* | | | | _
* --|-|- |-|- |-|---_-------_------- minCompactSize
* | | | | | | | | _ | |
* | | | | | | | | | | | |
* | | | | | | | | | | | |
*/
CompactSelection applyCompactionPolicy(CompactSelection candidates) throws IOException {
if (candidates.getFilesToCompact().isEmpty()) {
return candidates;
}
// we're doing a minor compaction, let's see what files are applicable
int start = 0;
double ratio = comConf.getCompactionRatio();
if (isOffPeakHour() && candidates.trySetOffpeak()) {
ratio = comConf.getCompactionRatioOffPeak();
LOG.info("Running an off-peak compaction, selection ratio = " + ratio
+ ", numOutstandingOffPeakCompactions is now "
+ CompactSelection.getNumOutStandingOffPeakCompactions());
}
// get store file sizes for incremental compacting selection.
int countOfFiles = candidates.getFilesToCompact().size();
long[] fileSizes = new long[countOfFiles];
long[] sumSize = new long[countOfFiles];
for (int i = countOfFiles - 1; i >= 0; --i) {
StoreFile file = candidates.getFilesToCompact().get(i);
fileSizes[i] = file.getReader().length();
// calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
int tooFar = i + comConf.getMaxFilesToCompact() - 1;
sumSize[i] = fileSizes[i]
+ ((i + 1 < countOfFiles) ? sumSize[i + 1] : 0)
- ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
}
while (countOfFiles - start >= comConf.getMinFilesToCompact() &&
fileSizes[start] > Math.max(comConf.getMinCompactSize(),
(long) (sumSize[start + 1] * ratio))) {
++start;
}
if (start < countOfFiles) {
LOG.info("Default compaction algorithm has selected " + (countOfFiles - start)
+ " files from " + countOfFiles + " candidates");
}
candidates = candidates.getSubList(start, countOfFiles);
return candidates;
}
/*
* @param filesToCompact Files to compact. Can be null.
* @return True if we should run a major compaction.
*/
public boolean isMajorCompaction(final List<StoreFile> filesToCompact)
throws IOException {
boolean result = false;
long mcTime = getNextMajorCompactTime(filesToCompact);
if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) {
return result;
}
// TODO: Use better method for determining stamp of last major (HBASE-2990)
long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact);
long now = System.currentTimeMillis();
if (lowTimestamp > 0l && lowTimestamp < (now - mcTime)) {
// Major compaction time has elapsed.
long cfTtl = this.storeConfig.getStoreFileTtl();
if (filesToCompact.size() == 1) {
// Single file
StoreFile sf = filesToCompact.get(0);
Long minTimestamp = sf.getMinimumTimestamp();
long oldest = (minTimestamp == null)
? Long.MIN_VALUE
: now - minTimestamp.longValue();
if (sf.isMajorCompaction() &&
(cfTtl == HConstants.FOREVER || oldest < cfTtl)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping major compaction of " + this +
" because one (major) compacted file only and oldestTime " +
oldest + "ms is < ttl=" + cfTtl);
}
} else if (cfTtl != HConstants.FOREVER && oldest > cfTtl) {
LOG.debug("Major compaction triggered on store " + this +
", because keyvalues outdated; time since last major compaction " +
(now - lowTimestamp) + "ms");
result = true;
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Major compaction triggered on store " + this +
"; time since last major compaction " + (now - lowTimestamp) + "ms");
}
result = true;
}
}
return result;
}
public long getNextMajorCompactTime(final List<StoreFile> filesToCompact) {
// default = 24hrs
long ret = comConf.getMajorCompactionPeriod();
if (ret > 0) {
// default = 20% = +/- 4.8 hrs
double jitterPct = comConf.getMajorCompactionJitter();
if (jitterPct > 0) {
long jitter = Math.round(ret * jitterPct);
// deterministic jitter avoids a major compaction storm on restart
Integer seed = StoreUtils.getDeterministicRandomSeed(filesToCompact);
if (seed != null) {
double rnd = (new Random(seed)).nextDouble();
ret += jitter - Math.round(2L * jitter * rnd);
} else {
ret = 0; // no storefiles == no major compaction
}
}
}
return ret;
}
/**
* @param compactionSize Total size of some compaction
* @return whether this should be a large or small compaction
*/
public boolean throttleCompaction(long compactionSize) {
return compactionSize > comConf.getThrottlePoint();
}
/**
* @param numCandidates Number of candidate store files
* @return whether a compactionSelection is possible
*/
public boolean needsCompaction(int numCandidates) {
return numCandidates > comConf.getMinFilesToCompact();
}
/**
* @return whether this is off-peak hour
*/
private boolean isOffPeakHour() {
int currentHour = calendar.get(Calendar.HOUR_OF_DAY);
int startHour = comConf.getOffPeakStartHour();
int endHour = comConf.getOffPeakEndHour();
// If offpeak time checking is disabled just return false.
if (startHour == endHour) {
return false;
}
if (startHour < endHour) {
return (currentHour >= startHour && currentHour < endHour);
}
return (currentHour >= startHour || currentHour < endHour);
}
}

View File

@ -208,6 +208,10 @@ public class CompactionRequest implements Comparable<CompactionRequest>,
return p;
}
public long getSelectionTime() {
return compactSelection.getSelectionTime();
}
/** Gets the priority for the request */
public void setPriority(int p) {
this.p = p;
@ -271,7 +275,7 @@ public class CompactionRequest implements Comparable<CompactionRequest>,
server.checkFileSystem();
} finally {
s.finishRequest(this);
LOG.debug("CompactSplitThread status: " + server.compactSplitThread);
LOG.debug("CompactSplitThread Status: " + server.compactSplitThread);
}
}

View File

@ -49,8 +49,7 @@ import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.*;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.experimental.categories.Category;
@ -302,6 +301,7 @@ public class TestCompaction extends HBaseTestCase {
conf.setFloat("hbase.hregion.majorcompaction.jitter", jitterPct);
HStore s = ((HStore) r.getStore(COLUMN_FAMILY));
s.compactionPolicy.updateConfiguration(conf, s);
try {
createStoreFile(r);
createStoreFile(r);
@ -313,9 +313,11 @@ public class TestCompaction extends HBaseTestCase {
assertEquals(2, s.getStorefilesCount());
// ensure that major compaction time is deterministic
long mcTime = s.getNextMajorCompactTime();
CompactionPolicy c = s.compactionPolicy;
List<StoreFile> storeFiles = s.getStorefiles();
long mcTime = c.getNextMajorCompactTime(storeFiles);
for (int i = 0; i < 10; ++i) {
assertEquals(mcTime, s.getNextMajorCompactTime());
assertEquals(mcTime, c.getNextMajorCompactTime(storeFiles));
}
// ensure that the major compaction time is within the variance

View File

@ -1,5 +1,4 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -26,6 +25,7 @@ import java.util.GregorianCalendar;
import java.util.List;
import junit.framework.TestCase;
import org.junit.experimental.categories.Category;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -35,30 +35,31 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection;
import org.apache.hadoop.hbase.regionserver.compactions.*;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import com.google.common.collect.Lists;
import org.junit.experimental.categories.Category;
@Category(SmallTests.class)
public class TestCompactSelection extends TestCase {
private final static Log LOG = LogFactory.getLog(TestCompactSelection.class);
public class TestDefaultCompactSelection extends TestCase {
private final static Log LOG = LogFactory.getLog(TestDefaultCompactSelection.class);
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private Configuration conf;
private HStore store;
protected Configuration conf;
protected HStore store;
private static final String DIR=
TEST_UTIL.getDataTestDir("TestCompactSelection").toString();
TEST_UTIL.getDataTestDir(TestDefaultCompactSelection.class.getSimpleName()).toString();
private static Path TEST_FILE;
private CompactionPolicy manager;
private static final int minFiles = 3;
private static final int maxFiles = 5;
protected static final int minFiles = 3;
protected static final int maxFiles = 5;
private static final long minSize = 10;
private static final long maxSize = 1000;
protected static final long minSize = 10;
protected static final long maxSize = 1000;
@Override
@ -94,6 +95,8 @@ public class TestCompactSelection extends TestCase {
region = new HRegion(tableDir, hlog, fs, conf, info, htd, null);
store = new HStore(basedir, region, hcd, fs, conf);
manager = store.compactionPolicy;
TEST_FILE = StoreFile.getRandomFilename(fs, store.getHomedir());
fs.create(TEST_FILE);
}
@ -102,14 +105,17 @@ public class TestCompactSelection extends TestCase {
static class MockStoreFile extends StoreFile {
long length = 0;
boolean isRef = false;
long ageInDisk;
long sequenceid;
MockStoreFile(long length, boolean isRef) throws IOException {
super(TEST_UTIL.getTestFileSystem(), TEST_FILE,
TEST_UTIL.getConfiguration(),
MockStoreFile(long length, long ageInDisk, boolean isRef, long sequenceid) throws IOException {
super(TEST_UTIL.getTestFileSystem(), TEST_FILE, TEST_UTIL.getConfiguration(),
new CacheConfig(TEST_UTIL.getConfiguration()), BloomType.NONE,
NoOpDataBlockEncoder.INSTANCE);
this.length = length;
this.isRef = isRef;
this.isRef = isRef;
this.ageInDisk = ageInDisk;
this.sequenceid = sequenceid;
}
void setLength(long newLen) {
@ -117,12 +123,17 @@ public class TestCompactSelection extends TestCase {
}
@Override
boolean isMajorCompaction() {
public long getMaxSequenceId() {
return sequenceid;
}
@Override
public boolean isMajorCompaction() {
return false;
}
@Override
boolean isReference() {
public boolean isReference() {
return this.isRef;
}
@ -138,29 +149,54 @@ public class TestCompactSelection extends TestCase {
}
}
List<StoreFile> sfCreate(long ... sizes) throws IOException {
return sfCreate(false, sizes);
ArrayList<Long> toArrayList(long... numbers) {
ArrayList<Long> result = new ArrayList<Long>();
for (long i : numbers) {
result.add(i);
}
return result;
}
List<StoreFile> sfCreate(boolean isReference, long ... sizes)
throws IOException {
List<StoreFile> sfCreate(long... sizes) throws IOException {
ArrayList<Long> ageInDisk = new ArrayList<Long>();
for (int i = 0; i < sizes.length; i++) {
ageInDisk.add(0L);
}
return sfCreate(toArrayList(sizes), ageInDisk);
}
List<StoreFile> sfCreate(ArrayList<Long> sizes, ArrayList<Long> ageInDisk)
throws IOException {
return sfCreate(false, sizes, ageInDisk);
}
List<StoreFile> sfCreate(boolean isReference, long... sizes) throws IOException {
ArrayList<Long> ageInDisk = new ArrayList<Long>(sizes.length);
for (int i = 0; i < sizes.length; i++) {
ageInDisk.add(0L);
}
return sfCreate(isReference, toArrayList(sizes), ageInDisk);
}
List<StoreFile> sfCreate(boolean isReference, ArrayList<Long> sizes, ArrayList<Long> ageInDisk)
throws IOException {
List<StoreFile> ret = Lists.newArrayList();
for (long i : sizes) {
ret.add(new MockStoreFile(i, isReference));
for (int i = 0; i < sizes.size(); i++) {
ret.add(new MockStoreFile(sizes.get(i), ageInDisk.get(i), isReference, i));
}
return ret;
}
long[] getSizes(List<StoreFile> sfList) {
long[] aNums = new long[sfList.size()];
for (int i=0; i <sfList.size(); ++i) {
for (int i = 0; i < sfList.size(); ++i) {
aNums[i] = sfList.get(i).getReader().length();
}
return aNums;
}
void compactEquals(List<StoreFile> candidates, long ... expected)
throws IOException {
void compactEquals(List<StoreFile> candidates, long... expected)
throws IOException {
compactEquals(candidates, false, expected);
}
@ -168,13 +204,15 @@ public class TestCompactSelection extends TestCase {
long ... expected)
throws IOException {
store.forceMajor = forcemajor;
List<StoreFile> actual = store.compactSelection(candidates).getFilesToCompact();
store.forceMajor = false;
//Test Default compactions
List<StoreFile> actual = store.compactionPolicy
.selectCompaction(candidates, false, forcemajor).getFilesToCompact();
assertEquals(Arrays.toString(expected), Arrays.toString(getSizes(actual)));
store.forceMajor = false;
}
public void testCompactionRatio() throws IOException {
/*
/**
* NOTE: these tests are specific to describe the implementation of the
* current compaction algorithm. Developed to ensure that refactoring
* doesn't implicitly alter this.
@ -192,13 +230,11 @@ public class TestCompactSelection extends TestCase {
// small files = don't care about ratio
compactEquals(sfCreate(8,3,1), 8,3,1);
/* TODO: add sorting + unit test back in when HBASE-2856 is fixed
// sort first so you don't include huge file the tail end
// sort first so you don't include huge file the tail end.
// happens with HFileOutputFormat bulk migration
compactEquals(sfCreate(100,50,23,12,12, 500), 23, 12, 12);
*/
// don't exceed max file compact threshold
assertEquals(maxFiles,
store.compactSelection(sfCreate(7,6,5,4,3,2,1)).getFilesToCompact().size());
// note: file selection starts with largest to smallest.
compactEquals(sfCreate(7, 6, 5, 4, 3, 2, 1), 7, 6, 5, 4, 3);
@ -213,13 +249,13 @@ public class TestCompactSelection extends TestCase {
store.forceMajor = true;
compactEquals(sfCreate(7, 6, 5, 4, 3, 2, 1), 7, 6, 5, 4, 3);
store.forceMajor = false;
// if we exceed maxCompactSize, downgrade to minor
// if not, it creates a 'snowball effect' when files >> maxCompactSize:
// the last file in compaction is the aggregate of all previous compactions
compactEquals(sfCreate(100,50,23,12,12), true, 23, 12, 12);
conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 1);
conf.setFloat("hbase.hregion.majorcompaction.jitter", 0);
store.compactionPolicy.updateConfiguration(conf, store);
try {
// trigger an aged major compaction
compactEquals(sfCreate(50,25,12,12), 50, 25, 12, 12);
@ -236,15 +272,12 @@ public class TestCompactSelection extends TestCase {
// reference files shouldn't obey max threshold
compactEquals(sfCreate(true, tooBig, 12,12), tooBig, 12, 12);
// reference files should obey max file compact to avoid OOM
assertEquals(maxFiles,
store.compactSelection(sfCreate(true, 7,6,5,4,3,2,1)).getFilesToCompact().size());
// reference compaction
compactEquals(sfCreate(true, 7, 6, 5, 4, 3, 2, 1), 5, 4, 3, 2, 1);
compactEquals(sfCreate(true, 7, 6, 5, 4, 3, 2, 1), 7, 6, 5, 4, 3);
// empty case
compactEquals(new ArrayList<StoreFile>() /* empty */);
// empty case (because all files are too big)
compactEquals(sfCreate(tooBig, tooBig) /* empty */);
compactEquals(sfCreate(tooBig, tooBig) /* empty */);
}
public void testOffPeakCompactionRatio() throws IOException {
@ -258,7 +291,7 @@ public class TestCompactSelection extends TestCase {
Calendar calendar = new GregorianCalendar();
int hourOfDay = calendar.get(Calendar.HOUR_OF_DAY);
LOG.debug("Hour of day = " + hourOfDay);
int hourPlusOne = ((hourOfDay+1+24)%24);
int hourPlusOne = ((hourOfDay+1)%24);
int hourMinusOne = ((hourOfDay-1+24)%24);
int hourMinusTwo = ((hourOfDay-2+24)%24);
@ -274,15 +307,15 @@ public class TestCompactSelection extends TestCase {
this.conf.setLong("hbase.offpeak.end.hour", hourPlusOne);
LOG.debug("Testing compact selection with off-peak settings (" +
hourMinusOne + ", " + hourPlusOne + ")");
compactEquals(sfCreate(999,50,12,12, 1), 50, 12, 12, 1);
store.compactionPolicy.updateConfiguration(this.conf, store);
compactEquals(sfCreate(999, 50, 12, 12, 1), 50, 12, 12, 1);
// set peak hour outside current selection and check compact selection
this.conf.setLong("hbase.offpeak.start.hour", hourMinusTwo);
this.conf.setLong("hbase.offpeak.end.hour", hourMinusOne);
store.compactionPolicy.updateConfiguration(this.conf, store);
LOG.debug("Testing compact selection with off-peak settings (" +
hourMinusTwo + ", " + hourMinusOne + ")");
compactEquals(sfCreate(999,50,12,12, 1), 12, 12, 1);
}
}

View File

@ -248,17 +248,15 @@ public class TestStore extends TestCase {
flush(i);
}
// after flush; check the lowest time stamp
long lowestTimeStampFromStore =
HStore.getLowestTimestamp(store.getStorefiles());
long lowestTimeStampFromFS =
getLowestTimeStampFromFS(fs,store.getStorefiles());
assertEquals(lowestTimeStampFromStore,lowestTimeStampFromFS);
long lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
long lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
assertEquals(lowestTimeStampFromManager,lowestTimeStampFromFS);
// after compact; check the lowest time stamp
store.compact(store.requestCompaction());
lowestTimeStampFromStore = HStore.getLowestTimestamp(store.getStorefiles());
lowestTimeStampFromFS = getLowestTimeStampFromFS(fs,store.getStorefiles());
assertEquals(lowestTimeStampFromStore,lowestTimeStampFromFS);
lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS);
}
private static long getLowestTimeStampFromFS(FileSystem fs,