From 9806433ee9ab44e686622d9435bc105da29ecb9a Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Tue, 27 Nov 2012 18:33:28 +0000 Subject: [PATCH] HBASE-7110 refactor the compaction selection and config code similarly to 0.89-fb changes; REAPPLY v9 git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1414308 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop/hbase/regionserver/HRegion.java | 2 +- .../hadoop/hbase/regionserver/HStore.java | 417 +++--------------- .../regionserver/StoreConfiguration.java | 50 +++ .../hadoop/hbase/regionserver/StoreFile.java | 13 +- .../hadoop/hbase/regionserver/StoreUtils.java | 64 +++ .../compactions/CompactSelection.java | 105 ++--- .../compactions/CompactionConfiguration.java | 214 +++++++++ .../compactions/CompactionPolicy.java | 409 +++++++++++++++++ .../compactions/CompactionRequest.java | 6 +- .../hbase/regionserver/TestCompaction.java | 10 +- .../TestDefaultCompactSelection.java | 321 ++++++++++++++ .../hadoop/hbase/regionserver/TestStore.java | 16 +- 12 files changed, 1187 insertions(+), 440 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreConfiguration.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreUtils.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index c70e9ab6410..c01a1a118c8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -4172,7 +4172,7 @@ public class HRegion implements HeapSize { // , Writable{ } /** - * @return True if needs a mojor compaction. + * @return True if needs a major compaction. * @throws IOException */ boolean isMajorCompaction() throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index e135f8a7d53..3f13698e1e3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -63,9 +64,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.io.hfile.InvalidHFileException; import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder; import org.apache.hadoop.hbase.monitoring.MonitoredTask; -import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.regionserver.compactions.*; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ChecksumType; import org.apache.hadoop.hbase.util.ClassSize; @@ -75,8 +74,6 @@ import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.util.StringUtils; import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; -import com.google.common.collect.Collections2; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; @@ -103,8 +100,9 @@ import com.google.common.collect.Lists; *

Locking and transactions are handled at a higher level. This API should * not be called directly but by an HRegion manager. */ +//TODO: move StoreConfiguration implementation into a separate class. @InterfaceAudience.Private -public class HStore implements Store { +public class HStore implements Store, StoreConfiguration { static final Log LOG = LogFactory.getLog(HStore.class); protected final MemStore memstore; @@ -112,15 +110,12 @@ public class HStore implements Store { private final Path homedir; private final HRegion region; private final HColumnDescriptor family; + CompactionPolicy compactionPolicy; final FileSystem fs; final Configuration conf; final CacheConfig cacheConf; - // ttl in milliseconds. + // ttl in milliseconds. TODO: can this be removed? Already stored in scanInfo. private long ttl; - private final int minFilesToCompact; - private final int maxFilesToCompact; - private final long minCompactSize; - private final long maxCompactSize; private long lastCompactSize = 0; volatile boolean forceMajor = false; /* how many bytes to write between status checks */ @@ -193,7 +188,7 @@ public class HStore implements Store { this.comparator = info.getComparator(); // Get TTL - this.ttl = getTTL(family); + this.ttl = determineTTLFromFamily(family); // used by ScanQueryMatcher long timeToPurgeDeletes = Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0); @@ -204,23 +199,11 @@ public class HStore implements Store { scanInfo = new ScanInfo(family, ttl, timeToPurgeDeletes, this.comparator); this.memstore = new MemStore(conf, this.comparator); - // By default, compact if storefile.count >= minFilesToCompact - this.minFilesToCompact = Math.max(2, - conf.getInt("hbase.hstore.compaction.min", - /*old name*/ conf.getInt("hbase.hstore.compactionThreshold", 3))); - LOG.info("hbase.hstore.compaction.min = " + this.minFilesToCompact); - // Setting up cache configuration for this family this.cacheConf = new CacheConfig(conf, family); this.blockingStoreFileCount = conf.getInt("hbase.hstore.blockingStoreFiles", 7); - this.maxFilesToCompact = conf.getInt("hbase.hstore.compaction.max", 10); - this.minCompactSize = conf.getLong("hbase.hstore.compaction.min.size", - this.region.memstoreFlushSize); - this.maxCompactSize - = conf.getLong("hbase.hstore.compaction.max.size", Long.MAX_VALUE); - this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false); if (HStore.closeCheckInterval == 0) { @@ -234,14 +217,16 @@ public class HStore implements Store { // initilize bytes per checksum this.bytesPerChecksum = getBytesPerChecksum(conf); // Create a compaction tool instance - this.compactor = new Compactor(this.conf); + this.compactor = new Compactor(conf); + // Create a compaction manager. + this.compactionPolicy = new CompactionPolicy(conf, this); } /** * @param family * @return */ - long getTTL(final HColumnDescriptor family) { + private static long determineTTLFromFamily(final HColumnDescriptor family) { // HCD.getTimeToLive returns ttl in seconds. Convert to milliseconds. long ttl = family.getTimeToLive(); if (ttl == HConstants.FOREVER) { @@ -285,6 +270,22 @@ public class HStore implements Store { return this.fs; } + /* Implementation of StoreConfiguration */ + public long getStoreFileTtl() { + // TTL only applies if there's no MIN_VERSIONs setting on the column. + return (this.scanInfo.getMinVersions() == 0) ? this.ttl : Long.MAX_VALUE; + } + + public Long getMajorCompactionPeriod() { + String strCompactionTime = this.family.getValue(HConstants.MAJOR_COMPACTION_PERIOD); + return (strCompactionTime != null) ? new Long(strCompactionTime) : null; + } + + public long getMemstoreFlushSize() { + return this.region.memstoreFlushSize; + } + /* End implementation of StoreConfiguration */ + /** * Returns the configured bytesPerChecksum value. * @param conf The configuration @@ -352,7 +353,8 @@ public class HStore implements Store { * @param family family name of this store * @return Path to the family/Store home directory */ - public static Path getStoreHomedir(final Path parentRegionDirectory, final byte[] family) { + public static Path getStoreHomedir(final Path parentRegionDirectory, + final byte[] family) { return new Path(parentRegionDirectory, new Path(Bytes.toString(family))); } /** @@ -566,7 +568,8 @@ public class HStore implements Store { "the destination store. Copying file over to destination filesystem."); Path tmpPath = getTmpPath(); FileUtil.copy(srcFs, srcPath, fs, tmpPath, false, conf); - LOG.info("Copied " + srcPath + " to temporary path on destination filesystem: " + tmpPath); + LOG.info("Copied " + srcPath + + " to temporary path on destination filesystem: " + tmpPath); srcPath = tmpPath; } @@ -663,8 +666,8 @@ public class HStore implements Store { /** * Snapshot this stores memstore. Call before running - * {@link #flushCache(long, SortedSet, TimeRangeTracker, AtomicLong, MonitoredTask)} so it has - * some work to do. + * {@link #flushCache(long, SortedSet, TimeRangeTracker, AtomicLong, MonitoredTask)} + * so it has some work to do. */ void snapshot() { this.memstore.snapshot(); @@ -722,7 +725,8 @@ public class HStore implements Store { InternalScanner scanner = null; KeyValueScanner memstoreScanner = new CollectionBackedScanner(set, this.comparator); if (getHRegion().getCoprocessorHost() != null) { - scanner = getHRegion().getCoprocessorHost().preFlushScannerOpen(this, memstoreScanner); + scanner = getHRegion().getCoprocessorHost() + .preFlushScannerOpen(this, memstoreScanner); } if (scanner == null) { Scan scan = new Scan(); @@ -759,7 +763,8 @@ public class HStore implements Store { if (!kvs.isEmpty()) { for (KeyValue kv : kvs) { // If we know that this KV is going to be included always, then let us - // set its memstoreTS to 0. This will help us save space when writing to disk. + // set its memstoreTS to 0. This will help us save space when writing to + // disk. if (kv.getMemstoreTS() <= smallestReadPoint) { // let us not change the original KV. It could be in the memstore // changing its memstoreTS could affect other threads/scanners. @@ -774,7 +779,8 @@ public class HStore implements Store { } while (hasMore); } finally { // Write out the log sequence number that corresponds to this output - // hfile. The hfile is current up to and including logCacheFlushId. + // hfile. Also write current time in metadata as minFlushTime. + // The hfile is current up to and including logCacheFlushId. status.setStatus("Flushing " + this + ": appending metadata"); writer.appendMetadata(logCacheFlushId, false); status.setStatus("Flushing " + this + ": closing flushed file"); @@ -1004,12 +1010,12 @@ public class HStore implements Store { // Ready to go. Have list of files to compact. LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in " - + this + " of " - + this.region.getRegionInfo().getRegionNameAsString() + + this + " of " + this.region.getRegionInfo().getRegionNameAsString() + " into tmpdir=" + region.getTmpDir() + ", seqid=" + maxId + ", totalSize=" + StringUtils.humanReadableInt(cr.getSize())); StoreFile sf = null; + long compactionStartTime = EnvironmentEdgeManager.currentTimeMillis(); try { StoreFile.Writer writer = this.compactor.compact(this, filesToCompact, cr.isMajor(), maxId); @@ -1031,6 +1037,7 @@ public class HStore implements Store { } } + long now = EnvironmentEdgeManager.currentTimeMillis(); LOG.info("Completed" + (cr.isMajor() ? " major " : " ") + "compaction of " + filesToCompact.size() + " file(s) in " + this + " of " + this.region.getRegionInfo().getRegionNameAsString() @@ -1038,8 +1045,11 @@ public class HStore implements Store { (sf == null ? "none" : sf.getPath().getName()) + ", size=" + (sf == null ? "none" : StringUtils.humanReadableInt(sf.getReader().length())) - + "; total size for store is " - + StringUtils.humanReadableInt(storeSize)); + + "; total size for store is " + StringUtils.humanReadableInt(storeSize) + + ". This selection was in queue for " + + StringUtils.formatTimeDiff(compactionStartTime, cr.getSelectionTime()) + + ", and took " + StringUtils.formatTimeDiff(now, compactionStartTime) + + " to execute."); return sf; } @@ -1094,38 +1104,7 @@ public class HStore implements Store { @Override public boolean hasReferences() { - return hasReferences(this.storefiles); - } - - /* - * @param files - * @return True if any of the files in files are References. - */ - private boolean hasReferences(Collection files) { - if (files != null && files.size() > 0) { - for (StoreFile hsf: files) { - if (hsf.isReference()) { - return true; - } - } - } - return false; - } - - /* - * Gets lowest timestamp from candidate StoreFiles - * - * @param fs - * @param dir - * @throws IOException - */ - public static long getLowestTimestamp(final List candidates) - throws IOException { - long minTs = Long.MAX_VALUE; - for (StoreFile storeFile : candidates) { - minTs = Math.min(minTs, storeFile.getModificationTimeStamp()); - } - return minTs; + return StoreUtils.hasReferences(this.storefiles); } @Override @@ -1143,91 +1122,7 @@ public class HStore implements Store { } List candidates = new ArrayList(this.storefiles); - - // exclude files above the max compaction threshold - // except: save all references. we MUST compact them - int pos = 0; - while (pos < candidates.size() && - candidates.get(pos).getReader().length() > this.maxCompactSize && - !candidates.get(pos).isReference()) ++pos; - candidates.subList(0, pos).clear(); - - return isMajorCompaction(candidates); - } - - /* - * @param filesToCompact Files to compact. Can be null. - * @return True if we should run a major compaction. - */ - private boolean isMajorCompaction(final List filesToCompact) throws IOException { - boolean result = false; - long mcTime = getNextMajorCompactTime(); - if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) { - return result; - } - // TODO: Use better method for determining stamp of last major (HBASE-2990) - long lowTimestamp = getLowestTimestamp(filesToCompact); - long now = System.currentTimeMillis(); - if (lowTimestamp > 0l && lowTimestamp < (now - mcTime)) { - // Major compaction time has elapsed. - if (filesToCompact.size() == 1) { - // Single file - StoreFile sf = filesToCompact.get(0); - long oldest = - (sf.getReader().timeRangeTracker == null) ? - Long.MIN_VALUE : - now - sf.getReader().timeRangeTracker.minimumTimestamp; - if (sf.isMajorCompaction() && - (this.ttl == HConstants.FOREVER || oldest < this.ttl)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Skipping major compaction of " + this + - " because one (major) compacted file only and oldestTime " + - oldest + "ms is < ttl=" + this.ttl); - } - } else if (this.ttl != HConstants.FOREVER && oldest > this.ttl) { - LOG.debug("Major compaction triggered on store " + this + - ", because keyvalues outdated; time since last major compaction " + - (now - lowTimestamp) + "ms"); - result = true; - } - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Major compaction triggered on store " + this + - "; time since last major compaction " + (now - lowTimestamp) + "ms"); - } - result = true; - } - } - return result; - } - - long getNextMajorCompactTime() { - // default = 24hrs - long ret = conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24); - if (family.getValue(HConstants.MAJOR_COMPACTION_PERIOD) != null) { - String strCompactionTime = - family.getValue(HConstants.MAJOR_COMPACTION_PERIOD); - ret = (new Long(strCompactionTime)).longValue(); - } - - if (ret > 0) { - // default = 20% = +/- 4.8 hrs - double jitterPct = conf.getFloat("hbase.hregion.majorcompaction.jitter", - 0.20F); - if (jitterPct > 0) { - long jitter = Math.round(ret * jitterPct); - // deterministic jitter avoids a major compaction storm on restart - ImmutableList snapshot = storefiles; - if (snapshot != null && !snapshot.isEmpty()) { - String seed = snapshot.get(0).getPath().getName(); - double curRand = new Random(seed.hashCode()).nextDouble(); - ret += jitter - Math.round(2L * jitter * curRand); - } else { - ret = 0; // no storefiles == no major compaction - } - } - } - return ret; + return compactionPolicy.isMajorCompaction(candidates); } public CompactionRequest requestCompaction() throws IOException { @@ -1263,9 +1158,11 @@ public class HStore implements Store { CompactSelection filesToCompact; if (override) { // coprocessor is overriding normal file selection - filesToCompact = new CompactSelection(conf, candidates); + filesToCompact = new CompactSelection(candidates); } else { - filesToCompact = compactSelection(candidates, priority); + boolean isUserCompaction = priority == Store.PRIORITY_USER; + filesToCompact = compactionPolicy.selectCompaction(candidates, isUserCompaction, + forceMajor && filesCompacting.isEmpty()); } if (region.getCoprocessorHost() != null) { @@ -1288,12 +1185,17 @@ public class HStore implements Store { Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID); // major compaction iff all StoreFiles are included - boolean isMajor = (filesToCompact.getFilesToCompact().size() == this.storefiles.size()); + boolean isMajor = + (filesToCompact.getFilesToCompact().size() == this.storefiles.size()); if (isMajor) { // since we're enqueuing a major, update the compaction wait interval this.forceMajor = false; } + LOG.debug(getHRegion().regionInfo.getEncodedName() + " - " + + getColumnFamilyName() + ": Initiating " + + (isMajor ? "major" : "minor") + " compaction"); + // everything went better than expected. create a compaction request int pri = getCompactPriority(priority); ret = new CompactionRequest(region, this, filesToCompact, isMajor, pri); @@ -1315,191 +1217,6 @@ public class HStore implements Store { } } - /** - * Algorithm to choose which files to compact, see {@link #compactSelection(java.util.List, int)} - * @param candidates - * @return - * @throws IOException - */ - CompactSelection compactSelection(List candidates) throws IOException { - return compactSelection(candidates,Store.NO_PRIORITY); - } - - /** - * Algorithm to choose which files to compact - * - * Configuration knobs: - * "hbase.hstore.compaction.ratio" - * normal case: minor compact when file <= sum(smaller_files) * ratio - * "hbase.hstore.compaction.min.size" - * unconditionally compact individual files below this size - * "hbase.hstore.compaction.max.size" - * never compact individual files above this size (unless splitting) - * "hbase.hstore.compaction.min" - * min files needed to minor compact - * "hbase.hstore.compaction.max" - * max files to compact at once (avoids OOM) - * - * @param candidates candidate files, ordered from oldest to newest - * @return subset copy of candidate list that meets compaction criteria - * @throws IOException - */ - CompactSelection compactSelection(List candidates, int priority) - throws IOException { - // ASSUMPTION!!! filesCompacting is locked when calling this function - - /* normal skew: - * - * older ----> newer - * _ - * | | _ - * | | | | _ - * --|-|- |-|- |-|---_-------_------- minCompactSize - * | | | | | | | | _ | | - * | | | | | | | | | | | | - * | | | | | | | | | | | | - */ - CompactSelection compactSelection = new CompactSelection(conf, candidates); - - boolean forcemajor = this.forceMajor && filesCompacting.isEmpty(); - if (!forcemajor) { - // Delete the expired store files before the compaction selection. - if (conf.getBoolean("hbase.store.delete.expired.storefile", true) - && (ttl != Long.MAX_VALUE) && (this.scanInfo.minVersions == 0)) { - CompactSelection expiredSelection = compactSelection - .selectExpiredStoreFilesToCompact( - EnvironmentEdgeManager.currentTimeMillis() - this.ttl); - - // If there is any expired store files, delete them by compaction. - if (expiredSelection != null) { - return expiredSelection; - } - } - // do not compact old files above a configurable threshold - // save all references. we MUST compact them - int pos = 0; - while (pos < compactSelection.getFilesToCompact().size() && - compactSelection.getFilesToCompact().get(pos).getReader().length() - > maxCompactSize && - !compactSelection.getFilesToCompact().get(pos).isReference()) ++pos; - if (pos != 0) compactSelection.clearSubList(0, pos); - } - - if (compactSelection.getFilesToCompact().isEmpty()) { - LOG.debug(this.getHRegionInfo().getEncodedName() + " - " + - this + ": no store files to compact"); - compactSelection.emptyFileList(); - return compactSelection; - } - - // Force a major compaction if this is a user-requested major compaction, - // or if we do not have too many files to compact and this was requested - // as a major compaction - boolean majorcompaction = (forcemajor && priority == Store.PRIORITY_USER) || - (forcemajor || isMajorCompaction(compactSelection.getFilesToCompact())) && - (compactSelection.getFilesToCompact().size() < this.maxFilesToCompact - ); - LOG.debug(this.getHRegionInfo().getEncodedName() + " - " - + this.getColumnFamilyName() + ": Initiating " + - (majorcompaction ? "major" : "minor") + "compaction"); - - if (!majorcompaction && - !hasReferences(compactSelection.getFilesToCompact())) { - // we're doing a minor compaction, let's see what files are applicable - int start = 0; - double r = compactSelection.getCompactSelectionRatio(); - - // remove bulk import files that request to be excluded from minors - compactSelection.getFilesToCompact().removeAll(Collections2.filter( - compactSelection.getFilesToCompact(), - new Predicate() { - public boolean apply(StoreFile input) { - return input.excludeFromMinorCompaction(); - } - })); - - // skip selection algorithm if we don't have enough files - if (compactSelection.getFilesToCompact().size() < this.minFilesToCompact) { - if(LOG.isDebugEnabled()) { - LOG.debug("Not compacting files because we only have " + - compactSelection.getFilesToCompact().size() + - " files ready for compaction. Need " + this.minFilesToCompact + " to initiate."); - } - compactSelection.emptyFileList(); - return compactSelection; - } - - /* TODO: add sorting + unit test back in when HBASE-2856 is fixed - // Sort files by size to correct when normal skew is altered by bulk load. - Collections.sort(filesToCompact, StoreFile.Comparators.FILE_SIZE); - */ - - // get store file sizes for incremental compacting selection. - int countOfFiles = compactSelection.getFilesToCompact().size(); - long [] fileSizes = new long[countOfFiles]; - long [] sumSize = new long[countOfFiles]; - for (int i = countOfFiles-1; i >= 0; --i) { - StoreFile file = compactSelection.getFilesToCompact().get(i); - fileSizes[i] = file.getReader().length(); - // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo - int tooFar = i + this.maxFilesToCompact - 1; - sumSize[i] = fileSizes[i] - + ((i+1 < countOfFiles) ? sumSize[i+1] : 0) - - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0); - } - - /* Start at the oldest file and stop when you find the first file that - * meets compaction criteria: - * (1) a recently-flushed, small file (i.e. <= minCompactSize) - * OR - * (2) within the compactRatio of sum(newer_files) - * Given normal skew, any newer files will also meet this criteria - * - * Additional Note: - * If fileSizes.size() >> maxFilesToCompact, we will recurse on - * compact(). Consider the oldest files first to avoid a - * situation where we always compact [end-threshold,end). Then, the - * last file becomes an aggregate of the previous compactions. - */ - while(countOfFiles - start >= this.minFilesToCompact && - fileSizes[start] > - Math.max(minCompactSize, (long)(sumSize[start+1] * r))) { - ++start; - } - int end = Math.min(countOfFiles, start + this.maxFilesToCompact); - long totalSize = fileSizes[start] - + ((start+1 < countOfFiles) ? sumSize[start+1] : 0); - compactSelection = compactSelection.getSubList(start, end); - - // if we don't have enough files to compact, just wait - if (compactSelection.getFilesToCompact().size() < this.minFilesToCompact) { - if (LOG.isDebugEnabled()) { - LOG.debug("Skipped compaction of " + this - + ". Only " + (end - start) + " file(s) of size " - + StringUtils.humanReadableInt(totalSize) - + " have met compaction criteria."); - } - compactSelection.emptyFileList(); - return compactSelection; - } - } else { - if(majorcompaction) { - if (compactSelection.getFilesToCompact().size() > this.maxFilesToCompact) { - LOG.debug("Warning, compacting more than " + this.maxFilesToCompact + - " files, probably because of a user-requested major compaction"); - if(priority != Store.PRIORITY_USER) { - LOG.error("Compacting more than max files on a non user-requested compaction"); - } - } - } else if (compactSelection.getFilesToCompact().size() > this.maxFilesToCompact) { - // all files included in this compaction, up to max - int pastMax = compactSelection.getFilesToCompact().size() - this.maxFilesToCompact; - compactSelection.getFilesToCompact().subList(0, pastMax).clear(); - } - } - return compactSelection; - } - /** * Validates a store file by opening and closing it. In HFileV2 this should * not be an expensive operation. @@ -1597,8 +1314,8 @@ public class HStore implements Store { // let the archive util decide if we should archive or delete the files LOG.debug("Removing store files after compaction..."); - HFileArchiver.archiveStoreFiles(this.fs, this.region, this.conf, this.family.getName(), - compactedFiles); + HFileArchiver.archiveStoreFiles(this.fs, this.region, this.conf, + this.family.getName(), compactedFiles); } catch (IOException e) { e = RemoteExceptionHandler.checkIOException(e); @@ -2005,11 +1722,7 @@ public class HStore implements Store { @Override public boolean throttleCompaction(long compactionSize) { - // see HBASE-5867 for discussion on the default - long throttlePoint = conf.getLong( - "hbase.regionserver.thread.compaction.throttle", - 2 * this.minFilesToCompact * this.region.memstoreFlushSize); - return compactionSize > throttlePoint; + return compactionPolicy.throttleCompaction(compactionSize); } @Override @@ -2115,7 +1828,7 @@ public class HStore implements Store { @Override public boolean needsCompaction() { - return (storefiles.size() - filesCompacting.size()) > minFilesToCompact; + return compactionPolicy.needsCompaction(storefiles.size() - filesCompacting.size()); } @Override @@ -2124,8 +1837,8 @@ public class HStore implements Store { } public static final long FIXED_OVERHEAD = - ClassSize.align((19 * ClassSize.REFERENCE) + (6 * Bytes.SIZEOF_LONG) - + (5 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN); + ClassSize.align((20 * ClassSize.REFERENCE) + (4 * Bytes.SIZEOF_LONG) + + (3 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN); public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreConfiguration.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreConfiguration.java new file mode 100644 index 00000000000..8ffd4eb741a --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreConfiguration.java @@ -0,0 +1,50 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * The class that contains shared information about various knobs of a Store/HStore object. + * Unlike the configuration objects that merely return the XML values, the implementations + * should return ready-to-use applicable values for corresponding calls, after all the + * parsing/validation/adjustment for other considerations, so that we don't have to repeat + * this logic in multiple places. + * TODO: move methods and logic here as necessary. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public interface StoreConfiguration { + /** + * Gets the cf-specific major compaction period. + */ + public Long getMajorCompactionPeriod(); + + + /** + * Gets the Memstore flush size for the region that this store works with. + */ + public long getMemstoreFlushSize(); + + /** + * Gets the cf-specific time-to-live for store files. + */ + public long getStoreFileTtl(); +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index a240f94c9ac..d62c40888cb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -65,6 +65,7 @@ import org.apache.hadoop.hbase.util.BloomFilter; import org.apache.hadoop.hbase.util.BloomFilterFactory; import org.apache.hadoop.hbase.util.BloomFilterWriter; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.io.RawComparator; @@ -295,7 +296,7 @@ public class StoreFile { * @return True if this is a StoreFile Reference; call after {@link #open()} * else may get wrong answer. */ - boolean isReference() { + public boolean isReference() { return this.reference != null; } @@ -357,7 +358,7 @@ public class StoreFile { /** * @return True if this file was made by a major compaction. */ - boolean isMajorCompaction() { + public boolean isMajorCompaction() { if (this.majorCompaction == null) { throw new NullPointerException("This has not been set yet"); } @@ -367,7 +368,7 @@ public class StoreFile { /** * @return True if this file should not be part of a minor compaction. */ - boolean excludeFromMinorCompaction() { + public boolean excludeFromMinorCompaction() { return this.excludeFromMinorCompaction; } @@ -579,7 +580,6 @@ public class StoreFile { } } } - this.reader.setSequenceID(this.sequenceid); b = metadataMap.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY); @@ -945,6 +945,11 @@ public class StoreFile { return r.write(fs, p); } + public Long getMinimumTimestamp() { + return (getReader().timeRangeTracker == null) ? + null : + getReader().timeRangeTracker.minimumTimestamp; + } /** * A StoreFile writer. Use this to read/write HBase Store Files. It is package diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreUtils.java new file mode 100644 index 00000000000..e7b61928ed3 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreUtils.java @@ -0,0 +1,64 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; + +/** + * Utility functions for region server storage layer. + */ +public class StoreUtils { + /** + * Creates a deterministic hash code for store file collection. + */ + public static Integer getDeterministicRandomSeed(final List files) { + if (files != null && !files.isEmpty()) { + return files.get(0).getPath().getName().hashCode(); + } + return null; + } + + /** + * Determines whether any files in the collection are references. + */ + public static boolean hasReferences(final Collection files) { + if (files != null && files.size() > 0) { + for (StoreFile hsf: files) { + if (hsf.isReference()) { + return true; + } + } + } + return false; + } + + /** + * Gets lowest timestamp from candidate StoreFiles + */ + public static long getLowestTimestamp(final List candidates) + throws IOException { + long minTs = Long.MAX_VALUE; + for (StoreFile storeFile : candidates) { + minTs = Math.min(minTs, storeFile.getModificationTimeStamp()); + } + return minTs; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactSelection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactSelection.java index d240b87e08e..66d8a0f9e72 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactSelection.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactSelection.java @@ -19,15 +19,13 @@ package org.apache.hadoop.hbase.regionserver.compactions; import java.util.ArrayList; -import java.util.Calendar; -import java.util.GregorianCalendar; import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @InterfaceAudience.Private public class CompactSelection { @@ -48,37 +46,15 @@ public class CompactSelection { */ private final static Object compactionCountLock = new Object(); - // HBase conf object - Configuration conf; // was this compaction promoted to an off-peak boolean isOffPeakCompaction = false; - // compactRatio: double on purpose! Float.MAX < Long.MAX < Double.MAX - // With float, java will downcast your long to float for comparisons (bad) - private double compactRatio; - // compaction ratio off-peak - private double compactRatioOffPeak; - // offpeak start time - private int offPeakStartHour = -1; - // off peak end time - private int offPeakEndHour = -1; + // CompactSelection object creation time. + private final long selectionTime; - public CompactSelection(Configuration conf, List filesToCompact) { + public CompactSelection(List filesToCompact) { + this.selectionTime = EnvironmentEdgeManager.currentTimeMillis(); this.filesToCompact = filesToCompact; - this.conf = conf; - this.compactRatio = conf.getFloat("hbase.hstore.compaction.ratio", 1.2F); - this.compactRatioOffPeak = conf.getFloat("hbase.hstore.compaction.ratio.offpeak", 5.0F); - - // Peak time is from [offPeakStartHour, offPeakEndHour). Valid numbers are [0, 23] - this.offPeakStartHour = conf.getInt("hbase.offpeak.start.hour", -1); - this.offPeakEndHour = conf.getInt("hbase.offpeak.end.hour", -1); - if (!isValidHour(this.offPeakStartHour) || !isValidHour(this.offPeakEndHour)) { - if (!(this.offPeakStartHour == -1 && this.offPeakEndHour == -1)) { - LOG.warn("Invalid start/end hour for peak hour : start = " + - this.offPeakStartHour + " end = " + this.offPeakEndHour + - ". Valid numbers are [0-23]"); - } - this.offPeakStartHour = this.offPeakEndHour = -1; - } + this.isOffPeakCompaction = false; } /** @@ -113,49 +89,25 @@ public class CompactSelection { } if (hasExpiredStoreFiles) { - expiredSFSelection = new CompactSelection(conf, expiredStoreFiles); + expiredSFSelection = new CompactSelection(expiredStoreFiles); } return expiredSFSelection; } - /** - * If the current hour falls in the off peak times and there are no - * outstanding off peak compactions, the current compaction is - * promoted to an off peak compaction. Currently only one off peak - * compaction is present in the compaction queue. - * - * @param currentHour - * @return - */ - public double getCompactSelectionRatio() { - double r = this.compactRatio; - synchronized(compactionCountLock) { - if (isOffPeakHour() && numOutstandingOffPeakCompactions == 0) { - r = this.compactRatioOffPeak; - numOutstandingOffPeakCompactions++; - isOffPeakCompaction = true; - } - } - if(isOffPeakCompaction) { - LOG.info("Running an off-peak compaction, selection ratio = " + - compactRatioOffPeak + ", numOutstandingOffPeakCompactions is now " + - numOutstandingOffPeakCompactions); - } - return r; - } - /** * The current compaction finished, so reset the off peak compactions count * if this was an off peak compaction. */ public void finishRequest() { if (isOffPeakCompaction) { + long newValueToLog = -1; synchronized(compactionCountLock) { - numOutstandingOffPeakCompactions--; + assert !isOffPeakCompaction : "Double-counting off-peak count for compaction"; + newValueToLog = --numOutstandingOffPeakCompactions; isOffPeakCompaction = false; } LOG.info("Compaction done, numOutstandingOffPeakCompactions is now " + - numOutstandingOffPeakCompactions); + newValueToLog); } } @@ -170,13 +122,14 @@ public class CompactSelection { public void emptyFileList() { filesToCompact.clear(); if (isOffPeakCompaction) { + long newValueToLog = -1; synchronized(compactionCountLock) { // reset the off peak count - numOutstandingOffPeakCompactions--; + newValueToLog = --numOutstandingOffPeakCompactions; isOffPeakCompaction = false; } LOG.info("Nothing to compact, numOutstandingOffPeakCompactions is now " + - numOutstandingOffPeakCompactions); + newValueToLog); } } @@ -184,16 +137,30 @@ public class CompactSelection { return this.isOffPeakCompaction; } - private boolean isOffPeakHour() { - int currentHour = (new GregorianCalendar()).get(Calendar.HOUR_OF_DAY); - // If offpeak time checking is disabled just return false. - if (this.offPeakStartHour == this.offPeakEndHour) { - return false; + public static long getNumOutStandingOffPeakCompactions() { + synchronized(compactionCountLock) { + return numOutstandingOffPeakCompactions; } - if (this.offPeakStartHour < this.offPeakEndHour) { - return (currentHour >= this.offPeakStartHour && currentHour < this.offPeakEndHour); + } + + /** + * Tries making the compaction off-peak. + * Only checks internal compaction constraints, not timing. + * @return Eventual value of isOffPeakCompaction. + */ + public boolean trySetOffpeak() { + assert !isOffPeakCompaction : "Double-setting off-peak for compaction " + this; + synchronized(compactionCountLock) { + if (numOutstandingOffPeakCompactions == 0) { + numOutstandingOffPeakCompactions++; + isOffPeakCompaction = true; + } } - return (currentHour >= this.offPeakStartHour || currentHour < this.offPeakEndHour); + return isOffPeakCompaction; + } + + public long getSelectionTime() { + return selectionTime; } public CompactSelection subList(int start, int end) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java new file mode 100644 index 00000000000..ea568854c0b --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java @@ -0,0 +1,214 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.compactions; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.regionserver.StoreConfiguration; + +/** + * Compaction configuration for a particular instance of HStore. + * Takes into account both global settings and ones set on the column family/store. + * Control knobs for default compaction algorithm: + *

+ * maxCompactSize - upper bound on file size to be included in minor compactions + * minCompactSize - lower bound below which compaction is selected without ratio test + * minFilesToCompact - lower bound on number of files in any minor compaction + * maxFilesToCompact - upper bound on number of files in any minor compaction + * compactionRatio - Ratio used for compaction + *

+ * Set parameter as "hbase.hstore.compaction." + */ + +//TODO: revisit this class for online parameter updating (both in xml and on the CF) +@InterfaceAudience.Private +public class CompactionConfiguration { + + static final Log LOG = LogFactory.getLog(CompactionConfiguration.class); + + private static final String CONFIG_PREFIX = "hbase.hstore.compaction."; + + Configuration conf; + StoreConfiguration storeConfig; + + long maxCompactSize; + long minCompactSize; + int minFilesToCompact; + int maxFilesToCompact; + double compactionRatio; + double offPeekCompactionRatio; + int offPeakStartHour; + int offPeakEndHour; + long throttlePoint; + boolean shouldDeleteExpired; + long majorCompactionPeriod; + float majorCompactionJitter; + + CompactionConfiguration(Configuration conf, StoreConfiguration storeConfig) { + this.conf = conf; + this.storeConfig = storeConfig; + + maxCompactSize = conf.getLong(CONFIG_PREFIX + "max.size", Long.MAX_VALUE); + minCompactSize = conf.getLong(CONFIG_PREFIX + "min.size", + storeConfig.getMemstoreFlushSize()); + minFilesToCompact = Math.max(2, conf.getInt(CONFIG_PREFIX + "min", + /*old name*/ conf.getInt("hbase.hstore.compactionThreshold", 3))); + maxFilesToCompact = conf.getInt(CONFIG_PREFIX + "max", 10); + compactionRatio = conf.getFloat(CONFIG_PREFIX + "ratio", 1.2F); + offPeekCompactionRatio = conf.getFloat(CONFIG_PREFIX + "ratio.offpeak", 5.0F); + offPeakStartHour = conf.getInt("hbase.offpeak.start.hour", -1); + offPeakEndHour = conf.getInt("hbase.offpeak.end.hour", -1); + + if (!isValidHour(offPeakStartHour) || !isValidHour(offPeakEndHour)) { + if (!(offPeakStartHour == -1 && offPeakEndHour == -1)) { + LOG.warn("Ignoring invalid start/end hour for peak hour : start = " + + this.offPeakStartHour + " end = " + this.offPeakEndHour + + ". Valid numbers are [0-23]"); + } + this.offPeakStartHour = this.offPeakEndHour = -1; + } + + throttlePoint = conf.getLong("hbase.regionserver.thread.compaction.throttle", + 2 * maxFilesToCompact * storeConfig.getMemstoreFlushSize()); + shouldDeleteExpired = conf.getBoolean("hbase.store.delete.expired.storefile", true); + majorCompactionPeriod = conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24); + majorCompactionJitter = conf.getFloat("hbase.hregion.majorcompaction.jitter", 0.20F); + + LOG.info("Compaction configuration " + this.toString()); + } + + @Override + public String toString() { + return String.format( + "size [%d, %d); files [%d, %d); ratio %f; off-peak ratio %f; off-peak hours %d-%d; " + + "throttle point %d;%s delete expired; major period %d, major jitter %f", + minCompactSize, + maxCompactSize, + minFilesToCompact, + maxFilesToCompact, + compactionRatio, + offPeekCompactionRatio, + offPeakStartHour, + offPeakEndHour, + throttlePoint, + shouldDeleteExpired ? "" : " don't", + majorCompactionPeriod, + majorCompactionJitter); + } + + /** + * @return lower bound below which compaction is selected without ratio test + */ + long getMinCompactSize() { + return minCompactSize; + } + + /** + * @return upper bound on file size to be included in minor compactions + */ + long getMaxCompactSize() { + return maxCompactSize; + } + + /** + * @return upper bound on number of files to be included in minor compactions + */ + int getMinFilesToCompact() { + return minFilesToCompact; + } + + /** + * @return upper bound on number of files to be included in minor compactions + */ + int getMaxFilesToCompact() { + return maxFilesToCompact; + } + + /** + * @return Ratio used for compaction + */ + double getCompactionRatio() { + return compactionRatio; + } + + /** + * @return Off peak Ratio used for compaction + */ + double getCompactionRatioOffPeak() { + return offPeekCompactionRatio; + } + + /** + * @return Hour at which off-peak compactions start + */ + int getOffPeakStartHour() { + return offPeakStartHour; + } + + /** + * @return Hour at which off-peak compactions end + */ + int getOffPeakEndHour() { + return offPeakEndHour; + } + + /** + * @return ThrottlePoint used for classifying small and large compactions + */ + long getThrottlePoint() { + return throttlePoint; + } + + /** + * @return Major compaction period from compaction. + * Major compactions are selected periodically according to this parameter plus jitter + */ + long getMajorCompactionPeriod() { + if (storeConfig != null) { + Long storeSpecificPeriod = storeConfig.getMajorCompactionPeriod(); + if (storeSpecificPeriod != null) { + return storeSpecificPeriod; + } + } + return majorCompactionPeriod; + } + + /** + * @return Major the jitter fraction, the fraction within which the major compaction + * period is randomly chosen from the majorCompactionPeriod in each store. + */ + float getMajorCompactionJitter() { + return majorCompactionJitter; + } + + /** + * @return Whether expired files should be deleted ASAP using compactions + */ + boolean shouldDeleteExpired() { + return shouldDeleteExpired; + } + + private static boolean isValidHour(int hour) { + return (hour >= 0 && hour <= 23); + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java new file mode 100644 index 00000000000..cf16a2b53cb --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java @@ -0,0 +1,409 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.compactions; + +import com.google.common.base.Predicate; +import com.google.common.collect.Collections2; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.regionserver.StoreConfiguration; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreUtils; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.util.StringUtils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.GregorianCalendar; +import java.util.List; +import java.util.Random; + +/** + * The default (and only, as of now) algorithm for selecting files for compaction. + * Combines the compaction configuration and the provisional file selection that + * it's given to produce the list of suitable candidates for compaction. + */ +@InterfaceAudience.Private +public class CompactionPolicy { + + private static final Log LOG = LogFactory.getLog(CompactionPolicy.class); + private final static Calendar calendar = new GregorianCalendar(); + + CompactionConfiguration comConf; + StoreConfiguration storeConfig; + + public CompactionPolicy(Configuration configuration, StoreConfiguration storeConfig) { + updateConfiguration(configuration, storeConfig); + } + + /** + * @param candidateFiles candidate files, ordered from oldest to newest + * @return subset copy of candidate list that meets compaction criteria + * @throws java.io.IOException + */ + public CompactSelection selectCompaction(List candidateFiles, + boolean isUserCompaction, boolean forceMajor) + throws IOException { + // Prelimanry compaction subject to filters + CompactSelection candidateSelection = new CompactSelection(candidateFiles); + long cfTtl = this.storeConfig.getStoreFileTtl(); + if (!forceMajor) { + // If there are expired files, only select them so that compaction deletes them + if (comConf.shouldDeleteExpired() && (cfTtl != Long.MAX_VALUE)) { + CompactSelection expiredSelection = selectExpiredStoreFiles( + candidateSelection, EnvironmentEdgeManager.currentTimeMillis() - cfTtl); + if (expiredSelection != null) { + return expiredSelection; + } + } + candidateSelection = skipLargeFiles(candidateSelection); + } + + // Force a major compaction if this is a user-requested major compaction, + // or if we do not have too many files to compact and this was requested + // as a major compaction. + // Or, if there are any references among the candidates. + boolean majorCompaction = ( + (forceMajor && isUserCompaction) + || ((forceMajor || isMajorCompaction(candidateSelection.getFilesToCompact())) + && (candidateSelection.getFilesToCompact().size() < comConf.getMaxFilesToCompact())) + || StoreUtils.hasReferences(candidateSelection.getFilesToCompact()) + ); + + if (!majorCompaction) { + // we're doing a minor compaction, let's see what files are applicable + candidateSelection = filterBulk(candidateSelection); + candidateSelection = applyCompactionPolicy(candidateSelection); + candidateSelection = checkMinFilesCriteria(candidateSelection); + } + candidateSelection = + removeExcessFiles(candidateSelection, isUserCompaction, majorCompaction); + return candidateSelection; + } + + /** + * Updates the compaction configuration. Used for tests. + * TODO: replace when HBASE-3909 is completed in some form. + */ + public void updateConfiguration(Configuration configuration, + StoreConfiguration storeConfig) { + this.comConf = new CompactionConfiguration(configuration, storeConfig); + this.storeConfig = storeConfig; + } + + /** + * Select the expired store files to compact + * + * @param candidates the initial set of storeFiles + * @param maxExpiredTimeStamp + * The store file will be marked as expired if its max time stamp is + * less than this maxExpiredTimeStamp. + * @return A CompactSelection contains the expired store files as + * filesToCompact + */ + private CompactSelection selectExpiredStoreFiles( + CompactSelection candidates, long maxExpiredTimeStamp) { + List filesToCompact = candidates.getFilesToCompact(); + if (filesToCompact == null || filesToCompact.size() == 0) + return null; + ArrayList expiredStoreFiles = null; + boolean hasExpiredStoreFiles = false; + CompactSelection expiredSFSelection = null; + + for (StoreFile storeFile : filesToCompact) { + if (storeFile.getReader().getMaxTimestamp() < maxExpiredTimeStamp) { + LOG.info("Deleting the expired store file by compaction: " + + storeFile.getPath() + " whose maxTimeStamp is " + + storeFile.getReader().getMaxTimestamp() + + " while the max expired timestamp is " + maxExpiredTimeStamp); + if (!hasExpiredStoreFiles) { + expiredStoreFiles = new ArrayList(); + hasExpiredStoreFiles = true; + } + expiredStoreFiles.add(storeFile); + } + } + + if (hasExpiredStoreFiles) { + expiredSFSelection = new CompactSelection(expiredStoreFiles); + } + return expiredSFSelection; + } + + /** + * @param candidates pre-filtrate + * @return filtered subset + * exclude all files above maxCompactSize + * Also save all references. We MUST compact them + */ + private CompactSelection skipLargeFiles(CompactSelection candidates) { + int pos = 0; + while (pos < candidates.getFilesToCompact().size() && + candidates.getFilesToCompact().get(pos).getReader().length() > + comConf.getMaxCompactSize() && + !candidates.getFilesToCompact().get(pos).isReference()) { + ++pos; + } + if (pos > 0) { + LOG.debug("Some files are too large. Excluding " + pos + + " files from compaction candidates"); + candidates.clearSubList(0, pos); + } + return candidates; + } + + /** + * @param candidates pre-filtrate + * @return filtered subset + * exclude all bulk load files if configured + */ + private CompactSelection filterBulk(CompactSelection candidates) { + candidates.getFilesToCompact().removeAll(Collections2.filter( + candidates.getFilesToCompact(), + new Predicate() { + @Override + public boolean apply(StoreFile input) { + return input.excludeFromMinorCompaction(); + } + })); + return candidates; + } + + /** + * @param candidates pre-filtrate + * @return filtered subset + * take upto maxFilesToCompact from the start + */ + private CompactSelection removeExcessFiles(CompactSelection candidates, + boolean isUserCompaction, boolean isMajorCompaction) { + int excess = candidates.getFilesToCompact().size() - comConf.getMaxFilesToCompact(); + if (excess > 0) { + if (isMajorCompaction && isUserCompaction) { + LOG.debug("Warning, compacting more than " + comConf.getMaxFilesToCompact() + + " files because of a user-requested major compaction"); + } else { + LOG.debug("Too many admissible files. Excluding " + excess + + " files from compaction candidates"); + candidates.clearSubList(comConf.getMaxFilesToCompact(), + candidates.getFilesToCompact().size()); + } + } + return candidates; + } + /** + * @param candidates pre-filtrate + * @return filtered subset + * forget the compactionSelection if we don't have enough files + */ + private CompactSelection checkMinFilesCriteria(CompactSelection candidates) { + int minFiles = comConf.getMinFilesToCompact(); + if (candidates.getFilesToCompact().size() < minFiles) { + if(LOG.isDebugEnabled()) { + LOG.debug("Not compacting files because we only have " + + candidates.getFilesToCompact().size() + + " files ready for compaction. Need " + minFiles + " to initiate."); + } + candidates.emptyFileList(); + } + return candidates; + } + + /** + * @param candidates pre-filtrate + * @return filtered subset + * -- Default minor compaction selection algorithm: + * choose CompactSelection from candidates -- + * First exclude bulk-load files if indicated in configuration. + * Start at the oldest file and stop when you find the first file that + * meets compaction criteria: + * (1) a recently-flushed, small file (i.e. <= minCompactSize) + * OR + * (2) within the compactRatio of sum(newer_files) + * Given normal skew, any newer files will also meet this criteria + *

+ * Additional Note: + * If fileSizes.size() >> maxFilesToCompact, we will recurse on + * compact(). Consider the oldest files first to avoid a + * situation where we always compact [end-threshold,end). Then, the + * last file becomes an aggregate of the previous compactions. + * + * normal skew: + * + * older ----> newer (increasing seqID) + * _ + * | | _ + * | | | | _ + * --|-|- |-|- |-|---_-------_------- minCompactSize + * | | | | | | | | _ | | + * | | | | | | | | | | | | + * | | | | | | | | | | | | + */ + CompactSelection applyCompactionPolicy(CompactSelection candidates) throws IOException { + if (candidates.getFilesToCompact().isEmpty()) { + return candidates; + } + + // we're doing a minor compaction, let's see what files are applicable + int start = 0; + double ratio = comConf.getCompactionRatio(); + if (isOffPeakHour() && candidates.trySetOffpeak()) { + ratio = comConf.getCompactionRatioOffPeak(); + LOG.info("Running an off-peak compaction, selection ratio = " + ratio + + ", numOutstandingOffPeakCompactions is now " + + CompactSelection.getNumOutStandingOffPeakCompactions()); + } + + // get store file sizes for incremental compacting selection. + int countOfFiles = candidates.getFilesToCompact().size(); + long[] fileSizes = new long[countOfFiles]; + long[] sumSize = new long[countOfFiles]; + for (int i = countOfFiles - 1; i >= 0; --i) { + StoreFile file = candidates.getFilesToCompact().get(i); + fileSizes[i] = file.getReader().length(); + // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo + int tooFar = i + comConf.getMaxFilesToCompact() - 1; + sumSize[i] = fileSizes[i] + + ((i + 1 < countOfFiles) ? sumSize[i + 1] : 0) + - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0); + } + + + while (countOfFiles - start >= comConf.getMinFilesToCompact() && + fileSizes[start] > Math.max(comConf.getMinCompactSize(), + (long) (sumSize[start + 1] * ratio))) { + ++start; + } + if (start < countOfFiles) { + LOG.info("Default compaction algorithm has selected " + (countOfFiles - start) + + " files from " + countOfFiles + " candidates"); + } + + candidates = candidates.getSubList(start, countOfFiles); + + return candidates; + } + + /* + * @param filesToCompact Files to compact. Can be null. + * @return True if we should run a major compaction. + */ + public boolean isMajorCompaction(final List filesToCompact) + throws IOException { + boolean result = false; + long mcTime = getNextMajorCompactTime(filesToCompact); + if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) { + return result; + } + // TODO: Use better method for determining stamp of last major (HBASE-2990) + long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact); + long now = System.currentTimeMillis(); + if (lowTimestamp > 0l && lowTimestamp < (now - mcTime)) { + // Major compaction time has elapsed. + long cfTtl = this.storeConfig.getStoreFileTtl(); + if (filesToCompact.size() == 1) { + // Single file + StoreFile sf = filesToCompact.get(0); + Long minTimestamp = sf.getMinimumTimestamp(); + long oldest = (minTimestamp == null) + ? Long.MIN_VALUE + : now - minTimestamp.longValue(); + if (sf.isMajorCompaction() && + (cfTtl == HConstants.FOREVER || oldest < cfTtl)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skipping major compaction of " + this + + " because one (major) compacted file only and oldestTime " + + oldest + "ms is < ttl=" + cfTtl); + } + } else if (cfTtl != HConstants.FOREVER && oldest > cfTtl) { + LOG.debug("Major compaction triggered on store " + this + + ", because keyvalues outdated; time since last major compaction " + + (now - lowTimestamp) + "ms"); + result = true; + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Major compaction triggered on store " + this + + "; time since last major compaction " + (now - lowTimestamp) + "ms"); + } + result = true; + } + } + return result; + } + + public long getNextMajorCompactTime(final List filesToCompact) { + // default = 24hrs + long ret = comConf.getMajorCompactionPeriod(); + if (ret > 0) { + // default = 20% = +/- 4.8 hrs + double jitterPct = comConf.getMajorCompactionJitter(); + if (jitterPct > 0) { + long jitter = Math.round(ret * jitterPct); + // deterministic jitter avoids a major compaction storm on restart + Integer seed = StoreUtils.getDeterministicRandomSeed(filesToCompact); + if (seed != null) { + double rnd = (new Random(seed)).nextDouble(); + ret += jitter - Math.round(2L * jitter * rnd); + } else { + ret = 0; // no storefiles == no major compaction + } + } + } + return ret; + } + + /** + * @param compactionSize Total size of some compaction + * @return whether this should be a large or small compaction + */ + public boolean throttleCompaction(long compactionSize) { + return compactionSize > comConf.getThrottlePoint(); + } + + /** + * @param numCandidates Number of candidate store files + * @return whether a compactionSelection is possible + */ + public boolean needsCompaction(int numCandidates) { + return numCandidates > comConf.getMinFilesToCompact(); + } + + /** + * @return whether this is off-peak hour + */ + private boolean isOffPeakHour() { + int currentHour = calendar.get(Calendar.HOUR_OF_DAY); + int startHour = comConf.getOffPeakStartHour(); + int endHour = comConf.getOffPeakEndHour(); + // If offpeak time checking is disabled just return false. + if (startHour == endHour) { + return false; + } + if (startHour < endHour) { + return (currentHour >= startHour && currentHour < endHour); + } + return (currentHour >= startHour || currentHour < endHour); + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java index 80e4d5ee090..9b74552175f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java @@ -208,6 +208,10 @@ public class CompactionRequest implements Comparable, return p; } + public long getSelectionTime() { + return compactSelection.getSelectionTime(); + } + /** Gets the priority for the request */ public void setPriority(int p) { this.p = p; @@ -271,7 +275,7 @@ public class CompactionRequest implements Comparable, server.checkFileSystem(); } finally { s.finishRequest(this); - LOG.debug("CompactSplitThread status: " + server.compactSplitThread); + LOG.debug("CompactSplitThread Status: " + server.compactSplitThread); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java index ed5f173ef79..72c50aa5299 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java @@ -49,8 +49,7 @@ import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl; import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; import org.apache.hadoop.hbase.io.hfile.HFileScanner; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.regionserver.compactions.*; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.util.Bytes; import org.junit.experimental.categories.Category; @@ -302,6 +301,7 @@ public class TestCompaction extends HBaseTestCase { conf.setFloat("hbase.hregion.majorcompaction.jitter", jitterPct); HStore s = ((HStore) r.getStore(COLUMN_FAMILY)); + s.compactionPolicy.updateConfiguration(conf, s); try { createStoreFile(r); createStoreFile(r); @@ -313,9 +313,11 @@ public class TestCompaction extends HBaseTestCase { assertEquals(2, s.getStorefilesCount()); // ensure that major compaction time is deterministic - long mcTime = s.getNextMajorCompactTime(); + CompactionPolicy c = s.compactionPolicy; + List storeFiles = s.getStorefiles(); + long mcTime = c.getNextMajorCompactTime(storeFiles); for (int i = 0; i < 10; ++i) { - assertEquals(mcTime, s.getNextMajorCompactTime()); + assertEquals(mcTime, c.getNextMajorCompactTime(storeFiles)); } // ensure that the major compaction time is within the variance diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java new file mode 100644 index 00000000000..03b1acc51c8 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java @@ -0,0 +1,321 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Calendar; +import java.util.GregorianCalendar; +import java.util.List; + +import junit.framework.TestCase; +import org.junit.experimental.categories.Category; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder; +import org.apache.hadoop.hbase.regionserver.compactions.*; +import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.regionserver.wal.HLogFactory; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +import com.google.common.collect.Lists; + +@Category(SmallTests.class) +public class TestDefaultCompactSelection extends TestCase { + private final static Log LOG = LogFactory.getLog(TestDefaultCompactSelection.class); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + protected Configuration conf; + protected HStore store; + private static final String DIR= + TEST_UTIL.getDataTestDir(TestDefaultCompactSelection.class.getSimpleName()).toString(); + private static Path TEST_FILE; + private CompactionPolicy manager; + + protected static final int minFiles = 3; + protected static final int maxFiles = 5; + + protected static final long minSize = 10; + protected static final long maxSize = 1000; + + + @Override + public void setUp() throws Exception { + // setup config values necessary for store + this.conf = TEST_UTIL.getConfiguration(); + this.conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 0); + this.conf.setInt("hbase.hstore.compaction.min", minFiles); + this.conf.setInt("hbase.hstore.compaction.max", maxFiles); + this.conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, minSize); + this.conf.setLong("hbase.hstore.compaction.max.size", maxSize); + this.conf.setFloat("hbase.hstore.compaction.ratio", 1.0F); + + //Setting up a Store + Path basedir = new Path(DIR); + String logName = "logs"; + Path logdir = new Path(DIR, logName); + Path oldLogDir = new Path(basedir, HConstants.HREGION_OLDLOGDIR_NAME); + HColumnDescriptor hcd = new HColumnDescriptor(Bytes.toBytes("family")); + FileSystem fs = FileSystem.get(conf); + + fs.delete(logdir, true); + + HTableDescriptor htd = new HTableDescriptor(Bytes.toBytes("table")); + htd.addFamily(hcd); + HRegionInfo info = new HRegionInfo(htd.getName(), null, null, false); + + HLog hlog = HLogFactory.createHLog(fs, basedir, + logName, conf); + HRegion region = HRegion.createHRegion(info, basedir, conf, htd); + HRegion.closeHRegion(region); + Path tableDir = new Path(basedir, Bytes.toString(htd.getName())); + region = new HRegion(tableDir, hlog, fs, conf, info, htd, null); + + store = new HStore(basedir, region, hcd, fs, conf); + manager = store.compactionPolicy; + + TEST_FILE = StoreFile.getRandomFilename(fs, store.getHomedir()); + fs.create(TEST_FILE); + } + + // used so our tests don't deal with actual StoreFiles + static class MockStoreFile extends StoreFile { + long length = 0; + boolean isRef = false; + long ageInDisk; + long sequenceid; + + MockStoreFile(long length, long ageInDisk, boolean isRef, long sequenceid) throws IOException { + super(TEST_UTIL.getTestFileSystem(), TEST_FILE, TEST_UTIL.getConfiguration(), + new CacheConfig(TEST_UTIL.getConfiguration()), BloomType.NONE, + NoOpDataBlockEncoder.INSTANCE); + this.length = length; + this.isRef = isRef; + this.ageInDisk = ageInDisk; + this.sequenceid = sequenceid; + } + + void setLength(long newLen) { + this.length = newLen; + } + + @Override + public long getMaxSequenceId() { + return sequenceid; + } + + @Override + public boolean isMajorCompaction() { + return false; + } + + @Override + public boolean isReference() { + return this.isRef; + } + + @Override + public StoreFile.Reader getReader() { + final long len = this.length; + return new StoreFile.Reader() { + @Override + public long length() { + return len; + } + }; + } + } + + ArrayList toArrayList(long... numbers) { + ArrayList result = new ArrayList(); + for (long i : numbers) { + result.add(i); + } + return result; + } + + List sfCreate(long... sizes) throws IOException { + ArrayList ageInDisk = new ArrayList(); + for (int i = 0; i < sizes.length; i++) { + ageInDisk.add(0L); + } + return sfCreate(toArrayList(sizes), ageInDisk); + } + + List sfCreate(ArrayList sizes, ArrayList ageInDisk) + throws IOException { + return sfCreate(false, sizes, ageInDisk); + } + + List sfCreate(boolean isReference, long... sizes) throws IOException { + ArrayList ageInDisk = new ArrayList(sizes.length); + for (int i = 0; i < sizes.length; i++) { + ageInDisk.add(0L); + } + return sfCreate(isReference, toArrayList(sizes), ageInDisk); + } + + List sfCreate(boolean isReference, ArrayList sizes, ArrayList ageInDisk) + throws IOException { + List ret = Lists.newArrayList(); + for (int i = 0; i < sizes.size(); i++) { + ret.add(new MockStoreFile(sizes.get(i), ageInDisk.get(i), isReference, i)); + } + return ret; + } + + long[] getSizes(List sfList) { + long[] aNums = new long[sfList.size()]; + for (int i = 0; i < sfList.size(); ++i) { + aNums[i] = sfList.get(i).getReader().length(); + } + return aNums; + } + + void compactEquals(List candidates, long... expected) + throws IOException { + compactEquals(candidates, false, expected); + } + + void compactEquals(List candidates, boolean forcemajor, + long ... expected) + throws IOException { + store.forceMajor = forcemajor; + //Test Default compactions + List actual = store.compactionPolicy + .selectCompaction(candidates, false, forcemajor).getFilesToCompact(); + assertEquals(Arrays.toString(expected), Arrays.toString(getSizes(actual))); + store.forceMajor = false; + } + + public void testCompactionRatio() throws IOException { + /** + * NOTE: these tests are specific to describe the implementation of the + * current compaction algorithm. Developed to ensure that refactoring + * doesn't implicitly alter this. + */ + long tooBig = maxSize + 1; + + // default case. preserve user ratio on size + compactEquals(sfCreate(100,50,23,12,12), 23, 12, 12); + // less than compact threshold = don't compact + compactEquals(sfCreate(100,50,25,12,12) /* empty */); + // greater than compact size = skip those + compactEquals(sfCreate(tooBig, tooBig, 700, 700, 700), 700, 700, 700); + // big size + threshold + compactEquals(sfCreate(tooBig, tooBig, 700,700) /* empty */); + // small files = don't care about ratio + compactEquals(sfCreate(8,3,1), 8,3,1); + /* TODO: add sorting + unit test back in when HBASE-2856 is fixed + // sort first so you don't include huge file the tail end. + // happens with HFileOutputFormat bulk migration + compactEquals(sfCreate(100,50,23,12,12, 500), 23, 12, 12); + */ + // don't exceed max file compact threshold + // note: file selection starts with largest to smallest. + compactEquals(sfCreate(7, 6, 5, 4, 3, 2, 1), 7, 6, 5, 4, 3); + + /* MAJOR COMPACTION */ + // if a major compaction has been forced, then compact everything + compactEquals(sfCreate(50,25,12,12), true, 50, 25, 12, 12); + // also choose files < threshold on major compaction + compactEquals(sfCreate(12,12), true, 12, 12); + // even if one of those files is too big + compactEquals(sfCreate(tooBig, 12,12), true, tooBig, 12, 12); + // don't exceed max file compact threshold, even with major compaction + store.forceMajor = true; + compactEquals(sfCreate(7, 6, 5, 4, 3, 2, 1), 7, 6, 5, 4, 3); + store.forceMajor = false; + // if we exceed maxCompactSize, downgrade to minor + // if not, it creates a 'snowball effect' when files >> maxCompactSize: + // the last file in compaction is the aggregate of all previous compactions + compactEquals(sfCreate(100,50,23,12,12), true, 23, 12, 12); + conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 1); + conf.setFloat("hbase.hregion.majorcompaction.jitter", 0); + store.compactionPolicy.updateConfiguration(conf, store); + try { + // trigger an aged major compaction + compactEquals(sfCreate(50,25,12,12), 50, 25, 12, 12); + // major sure exceeding maxCompactSize also downgrades aged minors + compactEquals(sfCreate(100,50,23,12,12), 23, 12, 12); + } finally { + conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24); + conf.setFloat("hbase.hregion.majorcompaction.jitter", 0.20F); + } + + /* REFERENCES == file is from a region that was split */ + // treat storefiles that have references like a major compaction + compactEquals(sfCreate(true, 100,50,25,12,12), 100, 50, 25, 12, 12); + // reference files shouldn't obey max threshold + compactEquals(sfCreate(true, tooBig, 12,12), tooBig, 12, 12); + // reference files should obey max file compact to avoid OOM + compactEquals(sfCreate(true, 7, 6, 5, 4, 3, 2, 1), 7, 6, 5, 4, 3); + + // empty case + compactEquals(new ArrayList() /* empty */); + // empty case (because all files are too big) + compactEquals(sfCreate(tooBig, tooBig) /* empty */); + } + + public void testOffPeakCompactionRatio() throws IOException { + /* + * NOTE: these tests are specific to describe the implementation of the + * current compaction algorithm. Developed to ensure that refactoring + * doesn't implicitly alter this. + */ + long tooBig = maxSize + 1; + + Calendar calendar = new GregorianCalendar(); + int hourOfDay = calendar.get(Calendar.HOUR_OF_DAY); + LOG.debug("Hour of day = " + hourOfDay); + int hourPlusOne = ((hourOfDay+1)%24); + int hourMinusOne = ((hourOfDay-1+24)%24); + int hourMinusTwo = ((hourOfDay-2+24)%24); + + // check compact selection without peak hour setting + LOG.debug("Testing compact selection without off-peak settings..."); + compactEquals(sfCreate(999,50,12,12,1), 12, 12, 1); + + // set an off-peak compaction threshold + this.conf.setFloat("hbase.hstore.compaction.ratio.offpeak", 5.0F); + + // set peak hour to current time and check compact selection + this.conf.setLong("hbase.offpeak.start.hour", hourMinusOne); + this.conf.setLong("hbase.offpeak.end.hour", hourPlusOne); + LOG.debug("Testing compact selection with off-peak settings (" + + hourMinusOne + ", " + hourPlusOne + ")"); + store.compactionPolicy.updateConfiguration(this.conf, store); + compactEquals(sfCreate(999, 50, 12, 12, 1), 50, 12, 12, 1); + + // set peak hour outside current selection and check compact selection + this.conf.setLong("hbase.offpeak.start.hour", hourMinusTwo); + this.conf.setLong("hbase.offpeak.end.hour", hourMinusOne); + store.compactionPolicy.updateConfiguration(this.conf, store); + LOG.debug("Testing compact selection with off-peak settings (" + + hourMinusTwo + ", " + hourMinusOne + ")"); + compactEquals(sfCreate(999,50,12,12, 1), 12, 12, 1); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java index 66fe256d1fd..01627da9daf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java @@ -248,17 +248,15 @@ public class TestStore extends TestCase { flush(i); } // after flush; check the lowest time stamp - long lowestTimeStampFromStore = - HStore.getLowestTimestamp(store.getStorefiles()); - long lowestTimeStampFromFS = - getLowestTimeStampFromFS(fs,store.getStorefiles()); - assertEquals(lowestTimeStampFromStore,lowestTimeStampFromFS); - + long lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles()); + long lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles()); + assertEquals(lowestTimeStampFromManager,lowestTimeStampFromFS); + // after compact; check the lowest time stamp store.compact(store.requestCompaction()); - lowestTimeStampFromStore = HStore.getLowestTimestamp(store.getStorefiles()); - lowestTimeStampFromFS = getLowestTimeStampFromFS(fs,store.getStorefiles()); - assertEquals(lowestTimeStampFromStore,lowestTimeStampFromFS); + lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles()); + lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles()); + assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS); } private static long getLowestTimeStampFromFS(FileSystem fs,