diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java
index 95defd3a00c..20ce0d0e0e6 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java
@@ -98,6 +98,7 @@ public class HBaseConfiguration extends Configuration {
public static Configuration addHbaseResources(Configuration conf) {
conf.addResource("hbase-default.xml");
conf.addResource("hbase-site.xml");
+ conf.addResource("hbase-compactions.xml");
checkDefaultsVersion(conf);
checkForClusterFreeMemoryLimit(conf);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
index 860d9093925..9d32edf5891 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
@@ -223,6 +223,8 @@ public class HFileOutputFormat extends FileOutputFormat
+ * maxCompactSize - upper bound on file size to be included in minor compactions
+ * minCompactSize - lower bound below which compaction is selected without ratio test
+ * minFilesToCompact - lower bound on number of files in any minor compaction
+ * maxFilesToCompact - upper bound on number of files in any minor compaction
+ * compactionRatio - Ratio used for compaction
+ *
+ * Set parameter as "hbase.hstore.compaction."
+ */
+
+//TODO: revisit this class for online parameter updating
+
+public class CompactionConfiguration {
+
+ static final Log LOG = LogFactory.getLog(CompactionConfiguration.class);
+
+ Configuration conf;
+ Store store;
+
+ long maxCompactSize;
+ long minCompactSize;
+ int minFilesToCompact;
+ int maxFilesToCompact;
+ double compactionRatio;
+ double offPeekCompactionRatio;
+ int offPeakStartHour;
+ int offPeakEndHour;
+ long throttlePoint;
+ boolean shouldDeleteExpired;
+ long majorCompactionPeriod;
+ float majorCompactionJitter;
+
+ CompactionConfiguration(Configuration conf, Store store) {
+ this.conf = conf;
+ this.store = store;
+
+ String strPrefix = "hbase.hstore.compaction.";
+
+ maxCompactSize = conf.getLong(strPrefix + "max.size", Long.MAX_VALUE);
+ minCompactSize = conf.getLong(strPrefix + "min.size", store.getHRegion().memstoreFlushSize);
+ minFilesToCompact = Math.max(2, conf.getInt(strPrefix + "min",
+ /*old name*/ conf.getInt("hbase.hstore.compactionThreshold", 3)));
+ maxFilesToCompact = conf.getInt(strPrefix + "max", 10);
+ compactionRatio = conf.getFloat(strPrefix + "ratio", 1.2F);
+ offPeekCompactionRatio = conf.getFloat(strPrefix + "ratio.offpeak", 5.0F);
+ offPeakStartHour = conf.getInt("hbase.offpeak.start.hour", -1);
+ offPeakEndHour = conf.getInt("hbase.offpeak.end.hour", -1);
+
+ if (!isValidHour(offPeakStartHour) || !isValidHour(offPeakEndHour)) {
+ if (!(offPeakStartHour == -1 && offPeakEndHour == -1)) {
+ LOG.warn("Ignoring invalid start/end hour for peak hour : start = " +
+ this.offPeakStartHour + " end = " + this.offPeakEndHour +
+ ". Valid numbers are [0-23]");
+ }
+ this.offPeakStartHour = this.offPeakEndHour = -1;
+ }
+
+ throttlePoint = conf.getLong("hbase.regionserver.thread.compaction.throttle",
+ 2 * maxFilesToCompact * store.getHRegion().memstoreFlushSize);
+ shouldDeleteExpired = conf.getBoolean("hbase.store.delete.expired.storefile", true);
+ majorCompactionPeriod = conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24);
+ majorCompactionJitter = conf.getFloat("hbase.hregion.majorcompaction.jitter", 0.20F);
+
+ LOG.info("Compaction configuration " + this.toString());
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "size [%d, %d); files [%d, %d); ratio %f; off-peak ratio %f; off-peak hours %d-%d; " +
+ "throttle point %d;%s delete expired; major period %d, major jitter %f",
+ minCompactSize,
+ maxCompactSize,
+ minFilesToCompact,
+ maxFilesToCompact,
+ compactionRatio,
+ offPeekCompactionRatio,
+ offPeakStartHour,
+ offPeakEndHour,
+ throttlePoint,
+ shouldDeleteExpired ? "" : " don't",
+ majorCompactionPeriod,
+ majorCompactionJitter);
+ }
+
+ /**
+ * @return lower bound below which compaction is selected without ratio test
+ */
+ long getMinCompactSize() {
+ return minCompactSize;
+ }
+
+ /**
+ * @return upper bound on file size to be included in minor compactions
+ */
+ long getMaxCompactSize() {
+ return maxCompactSize;
+ }
+
+ /**
+ * @return upper bound on number of files to be included in minor compactions
+ */
+ int getMinFilesToCompact() {
+ return minFilesToCompact;
+ }
+
+ /**
+ * @return upper bound on number of files to be included in minor compactions
+ */
+ int getMaxFilesToCompact() {
+ return maxFilesToCompact;
+ }
+
+ /**
+ * @return Ratio used for compaction
+ */
+ double getCompactionRatio() {
+ return compactionRatio;
+ }
+
+ /**
+ * @return Off peak Ratio used for compaction
+ */
+ double getCompactionRatioOffPeak() {
+ return offPeekCompactionRatio;
+ }
+
+ /**
+ * @return Hour at which off-peak compactions start
+ */
+ int getOffPeakStartHour() {
+ return offPeakStartHour;
+ }
+
+ /**
+ * @return Hour at which off-peak compactions end
+ */
+ int getOffPeakEndHour() {
+ return offPeakEndHour;
+ }
+
+ /**
+ * @return ThrottlePoint used for classifying small and large compactions
+ */
+ long getThrottlePoint() {
+ return throttlePoint;
+ }
+
+ /**
+ * @return Major compaction period from compaction.
+ * Major compactions are selected periodically according to this parameter plus jitter
+ */
+ long getMajorCompactionPeriod() {
+ return majorCompactionPeriod;
+ }
+
+ /**
+ * @return Major the jitter fraction, the fraction within which the major compaction period is
+ * randomly chosen from the majorCompactionPeriod in each store.
+ */
+ float getMajorCompactionJitter() {
+ return majorCompactionJitter;
+ }
+
+ /**
+ * @return Whether expired files should be deleted ASAP using compactions
+ */
+ boolean shouldDeleteExpired() {
+ return shouldDeleteExpired;
+ }
+
+ private static boolean isValidHour(int hour) {
+ return (hour >= 0 && hour <= 23);
+ }
+}
\ No newline at end of file
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionManager.java
new file mode 100644
index 00000000000..716b27ea77a
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionManager.java
@@ -0,0 +1,411 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Collections2;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.util.StringUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.GregorianCalendar;
+import java.util.List;
+import java.util.Random;
+
+@InterfaceAudience.Private
+public class CompactionManager {
+
+ private static final Log LOG = LogFactory.getLog(CompactionManager.class);
+ private final static Calendar calendar = new GregorianCalendar();
+
+ private Store store;
+ CompactionConfiguration comConf;
+
+ CompactionManager(Configuration configuration, Store store) {
+ this.store = store;
+ comConf = new CompactionConfiguration(configuration, store);
+ }
+
+ /**
+ * @param candidateFiles candidate files, ordered from oldest to newest
+ * @return subset copy of candidate list that meets compaction criteria
+ * @throws java.io.IOException
+ */
+ CompactSelection selectCompaction(List candidateFiles, int priority, boolean forceMajor)
+ throws IOException {
+ // Prelimanry compaction subject to filters
+ CompactSelection candidateSelection = new CompactSelection(candidateFiles);
+
+ if (!forceMajor) {
+ // If there are expired files, only select them so that compaction deletes them
+ if (comConf.shouldDeleteExpired() && (store.getTtl() != Long.MAX_VALUE)) {
+ CompactSelection expiredSelection = selectExpiredSFs(
+ candidateSelection, EnvironmentEdgeManager.currentTimeMillis() - store.getTtl());
+ if (expiredSelection != null) {
+ return expiredSelection;
+ }
+ }
+ candidateSelection = skipLargeFiles(candidateSelection);
+ }
+
+ // Force a major compaction if this is a user-requested major compaction,
+ // or if we do not have too many files to compact and this was requested
+ // as a major compaction.
+ // Or, if there are any references among the candidates.
+ boolean isUserCompaction = (priority == Store.PRIORITY_USER);
+ boolean majorCompaction = (
+ (forceMajor && isUserCompaction)
+ || ((forceMajor || isMajorCompaction(candidateSelection.getFilesToCompact()))
+ && (candidateSelection.getFilesToCompact().size() < comConf.getMaxFilesToCompact()))
+ || store.hasReferences(candidateSelection.getFilesToCompact())
+ );
+
+ LOG.debug(store.getHRegion().regionInfo.getEncodedName() + " - " +
+ store.getColumnFamilyName() + ": Initiating " +
+ (majorCompaction ? "major" : "minor") + "compaction");
+
+ if (!majorCompaction) {
+ // we're doing a minor compaction, let's see what files are applicable
+ candidateSelection = filterBulk(candidateSelection);
+ candidateSelection = applyCompactionPolicy(candidateSelection);
+ candidateSelection = checkMinFilesCriteria(candidateSelection);
+ }
+ candidateSelection = removeExcessFiles(candidateSelection, isUserCompaction, majorCompaction);
+ return candidateSelection;
+ }
+
+ /**
+ * Select the expired store files to compact
+ *
+ * @param candidates the initial set of storeFiles
+ * @param maxExpiredTimeStamp
+ * The store file will be marked as expired if its max time stamp is
+ * less than this maxExpiredTimeStamp.
+ * @return A CompactSelection contains the expired store files as
+ * filesToCompact
+ */
+ private CompactSelection selectExpiredSFs
+ (CompactSelection candidates, long maxExpiredTimeStamp) {
+ List filesToCompact = candidates.getFilesToCompact();
+ if (filesToCompact == null || filesToCompact.size() == 0)
+ return null;
+ ArrayList expiredStoreFiles = null;
+ boolean hasExpiredStoreFiles = false;
+ CompactSelection expiredSFSelection = null;
+
+ for (StoreFile storeFile : filesToCompact) {
+ if (storeFile.getReader().getMaxTimestamp() < maxExpiredTimeStamp) {
+ LOG.info("Deleting the expired store file by compaction: "
+ + storeFile.getPath() + " whose maxTimeStamp is "
+ + storeFile.getReader().getMaxTimestamp()
+ + " while the max expired timestamp is " + maxExpiredTimeStamp);
+ if (!hasExpiredStoreFiles) {
+ expiredStoreFiles = new ArrayList();
+ hasExpiredStoreFiles = true;
+ }
+ expiredStoreFiles.add(storeFile);
+ }
+ }
+
+ if (hasExpiredStoreFiles) {
+ expiredSFSelection = new CompactSelection(expiredStoreFiles);
+ }
+ return expiredSFSelection;
+ }
+
+ /**
+ * @param candidates pre-filtrate
+ * @return filtered subset
+ * exclude all files above maxCompactSize
+ * Also save all references. We MUST compact them
+ */
+ private CompactSelection skipLargeFiles(CompactSelection candidates) {
+ int pos = 0;
+ while (pos < candidates.getFilesToCompact().size() &&
+ candidates.getFilesToCompact().get(pos).getReader().length() >
+ comConf.getMaxCompactSize() &&
+ !candidates.getFilesToCompact().get(pos).isReference()) {
+ ++pos;
+ }
+ if (pos > 0) {
+ LOG.debug("Some files are too large. Excluding " + pos + " files from compaction candidates");
+ candidates.clearSubList(0, pos);
+ }
+ return candidates;
+ }
+
+ /**
+ * @param candidates pre-filtrate
+ * @return filtered subset
+ * exclude all bulk load files if configured
+ */
+ private CompactSelection filterBulk(CompactSelection candidates) {
+ candidates.getFilesToCompact().removeAll(Collections2.filter(
+ candidates.getFilesToCompact(),
+ new Predicate() {
+ @Override
+ public boolean apply(StoreFile input) {
+ return input.excludeFromMinorCompaction();
+ }
+ }));
+ return candidates;
+ }
+
+ /**
+ * @param candidates pre-filtrate
+ * @return filtered subset
+ * take upto maxFilesToCompact from the start
+ */
+ private CompactSelection removeExcessFiles(CompactSelection candidates,
+ boolean isUserCompaction, boolean isMajorCompaction) {
+ int excess = candidates.getFilesToCompact().size() - comConf.getMaxFilesToCompact();
+ if (excess > 0) {
+ if (isMajorCompaction && isUserCompaction) {
+ LOG.debug("Warning, compacting more than " + comConf.getMaxFilesToCompact() +
+ " files because of a user-requested major compaction");
+ } else {
+ LOG.debug("Too many admissible files. Excluding " + excess
+ + " files from compaction candidates");
+ candidates.clearSubList(comConf.getMaxFilesToCompact(),
+ candidates.getFilesToCompact().size());
+ }
+ }
+ return candidates;
+ }
+ /**
+ * @param candidates pre-filtrate
+ * @return filtered subset
+ * forget the compactionSelection if we don't have enough files
+ */
+ private CompactSelection checkMinFilesCriteria(CompactSelection candidates) {
+ int minFiles = comConf.getMinFilesToCompact();
+ if (candidates.getFilesToCompact().size() < minFiles) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Not compacting files because we only have " +
+ candidates.getFilesToCompact().size() +
+ " files ready for compaction. Need " + minFiles + " to initiate.");
+ }
+ candidates.emptyFileList();
+ }
+ return candidates;
+ }
+
+ /**
+ * @param candidates pre-filtrate
+ * @return filtered subset
+ * -- Default minor compaction selection algorithm: Choose CompactSelection from candidates --
+ * First exclude bulk-load files if indicated in configuration.
+ * Start at the oldest file and stop when you find the first file that
+ * meets compaction criteria:
+ * (1) a recently-flushed, small file (i.e. <= minCompactSize)
+ * OR
+ * (2) within the compactRatio of sum(newer_files)
+ * Given normal skew, any newer files will also meet this criteria
+ *
+ * Additional Note:
+ * If fileSizes.size() >> maxFilesToCompact, we will recurse on
+ * compact(). Consider the oldest files first to avoid a
+ * situation where we always compact [end-threshold,end). Then, the
+ * last file becomes an aggregate of the previous compactions.
+ *
+ * normal skew:
+ *
+ * older ----> newer (increasing seqID)
+ * _
+ * | | _
+ * | | | | _
+ * --|-|- |-|- |-|---_-------_------- minCompactSize
+ * | | | | | | | | _ | |
+ * | | | | | | | | | | | |
+ * | | | | | | | | | | | |
+ */
+ CompactSelection applyCompactionPolicy(CompactSelection candidates) throws IOException {
+ if (candidates.getFilesToCompact().isEmpty()) {
+ return candidates;
+ }
+
+ // we're doing a minor compaction, let's see what files are applicable
+ int start = 0;
+ double ratio = comConf.getCompactionRatio();
+ if (isOffPeakHour() && candidates.trySetOffpeak()) {
+ ratio = comConf.getCompactionRatioOffPeak();
+ LOG.info("Running an off-peak compaction, selection ratio = " + ratio
+ + ", numOutstandingOffPeakCompactions is now "
+ + CompactSelection.getNumOutStandingOffPeakCompactions());
+ }
+
+ // get store file sizes for incremental compacting selection.
+ int countOfFiles = candidates.getFilesToCompact().size();
+ long[] fileSizes = new long[countOfFiles];
+ long[] sumSize = new long[countOfFiles];
+ for (int i = countOfFiles - 1; i >= 0; --i) {
+ StoreFile file = candidates.getFilesToCompact().get(i);
+ fileSizes[i] = file.getReader().length();
+ // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
+ int tooFar = i + comConf.getMaxFilesToCompact() - 1;
+ sumSize[i] = fileSizes[i]
+ + ((i + 1 < countOfFiles) ? sumSize[i + 1] : 0)
+ - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
+ }
+
+
+ while (countOfFiles - start >= comConf.getMinFilesToCompact() &&
+ fileSizes[start] > Math.max(comConf.getMinCompactSize(), (long) (sumSize[start + 1] * ratio))) {
+ ++start;
+ }
+ if (start < countOfFiles) {
+ LOG.info("Default compaction algorithm has selected " + (countOfFiles - start)
+ + " files from " + countOfFiles + " candidates");
+ }
+
+ candidates = candidates.getSubList(start, countOfFiles);
+
+ return candidates;
+ }
+
+ /*
+ * @param filesToCompact Files to compact. Can be null.
+ * @return True if we should run a major compaction.
+ */
+ boolean isMajorCompaction(final List filesToCompact) throws IOException {
+ boolean result = false;
+ long mcTime = getNextMajorCompactTime();
+ if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) {
+ return result;
+ }
+ // TODO: Use better method for determining stamp of last major (HBASE-2990)
+ long lowTimestamp = getLowestTimestamp(filesToCompact);
+ long now = System.currentTimeMillis();
+ if (lowTimestamp > 0l && lowTimestamp < (now - mcTime)) {
+ // Major compaction time has elapsed.
+ if (filesToCompact.size() == 1) {
+ // Single file
+ StoreFile sf = filesToCompact.get(0);
+ long oldest =
+ (sf.getReader().timeRangeTracker == null) ?
+ Long.MIN_VALUE :
+ now - sf.getReader().timeRangeTracker.minimumTimestamp;
+ if (sf.isMajorCompaction() &&
+ (store.getTtl() == HConstants.FOREVER || oldest < store.getTtl())) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skipping major compaction of " + this +
+ " because one (major) compacted file only and oldestTime " +
+ oldest + "ms is < ttl=" + store.getTtl());
+ }
+ } else if (store.getTtl() != HConstants.FOREVER && oldest > store.getTtl()) {
+ LOG.debug("Major compaction triggered on store " + this +
+ ", because keyvalues outdated; time since last major compaction " +
+ (now - lowTimestamp) + "ms");
+ result = true;
+ }
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Major compaction triggered on store " + this +
+ "; time since last major compaction " + (now - lowTimestamp) + "ms");
+ }
+ result = true;
+ }
+ }
+ return result;
+ }
+
+ long getNextMajorCompactTime() {
+ // default = 24hrs
+ long ret = comConf.getMajorCompactionPeriod();
+ String strCompactionTime = store.getFamily().getValue(HConstants.MAJOR_COMPACTION_PERIOD);
+ if (strCompactionTime != null) {
+ ret = (new Long(strCompactionTime)).longValue();
+ }
+
+ if (ret > 0) {
+ // default = 20% = +/- 4.8 hrs
+ double jitterPct = comConf.getMajorCompactionJitter();
+ if (jitterPct > 0) {
+ long jitter = Math.round(ret * jitterPct);
+ // deterministic jitter avoids a major compaction storm on restart
+ Integer seed = store.getDeterministicRandomSeed();
+ if (seed != null) {
+ double rnd = (new Random(seed)).nextDouble();
+ ret += jitter - Math.round(2L * jitter * rnd);
+ } else {
+ ret = 0; // no storefiles == no major compaction
+ }
+ }
+ }
+ return ret;
+ }
+
+ /*
+ * Gets lowest timestamp from candidate StoreFiles
+ *
+ * @param fs
+ * @param dir
+ * @throws IOException
+ */
+ static long getLowestTimestamp(final List candidates)
+ throws IOException {
+ long minTs = Long.MAX_VALUE;
+ for (StoreFile storeFile : candidates) {
+ minTs = Math.min(minTs, storeFile.getModificationTimeStamp());
+ }
+ return minTs;
+ }
+
+ /**
+ * @param compactionSize Total size of some compaction
+ * @return whether this should be a large or small compaction
+ */
+ boolean throttleCompaction(long compactionSize) {
+ return compactionSize > comConf.getThrottlePoint();
+ }
+
+ /**
+ * @param numCandidates Number of candidate store files
+ * @return whether a compactionSelection is possible
+ */
+ boolean needsCompaction(int numCandidates) {
+ return numCandidates > comConf.getMinFilesToCompact();
+ }
+
+ /**
+ * @return whether this is off-peak hour
+ */
+ private boolean isOffPeakHour() {
+ int currentHour = calendar.get(Calendar.HOUR_OF_DAY);
+ int startHour = comConf.getOffPeakStartHour();
+ int endHour = comConf.getOffPeakEndHour();
+ // If offpeak time checking is disabled just return false.
+ if (startHour == endHour) {
+ return false;
+ }
+ if (startHour < endHour) {
+ return (currentHour >= startHour && currentHour < endHour);
+ }
+ return (currentHour >= startHour || currentHour < endHour);
+ }
+}
\ No newline at end of file
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java
index a526345e18c..c0748cd9ed9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.util.StringUtils;
/**
@@ -64,11 +65,15 @@ class Compactor extends Configured {
final Collection filesToCompact,
final boolean majorCompaction, final long maxId)
throws IOException {
- // Calculate maximum key count after compaction (for blooms)
+ // Calculate maximum key count after compaction (for blooms), and minFlushTime after compaction
// Also calculate earliest put timestamp if major compaction
int maxKeyCount = 0;
+ long minFlushTime = Long.MAX_VALUE;
long earliestPutTs = HConstants.LATEST_TIMESTAMP;
for (StoreFile file: filesToCompact) {
+ if (file.hasMinFlushTime() && file.getMinFlushTime() < minFlushTime) {
+ minFlushTime = file.getMinFlushTime();
+ }
StoreFile.Reader r = file.getReader();
if (r == null) {
LOG.warn("Null reader for " + file.getPath());
@@ -194,6 +199,10 @@ class Compactor extends Configured {
}
} finally {
if (writer != null) {
+ if (minFlushTime == Long.MAX_VALUE) {
+ minFlushTime = StoreFile.NO_MIN_FLUSH_TIME;
+ }
+ writer.appendFileInfo(StoreFile.MIN_FLUSH_TIME, Bytes.toBytes(minFlushTime));
writer.appendMetadata(maxId, majorCompaction);
writer.close();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index a91fc490fdd..529865d66a1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -4176,7 +4176,7 @@ public class HRegion implements HeapSize { // , Writable{
}
/**
- * @return True if needs a mojor compaction.
+ * @return True if needs a major compaction.
* @throws IOException
*/
boolean isMajorCompaction() throws IOException {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 32d83f95c5a..2000d318870 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -63,9 +64,10 @@ import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.io.hfile.InvalidHFileException;
import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection;
+import org.apache.hadoop.hbase.regionserver.CompactionManager;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
import org.apache.hadoop.hbase.util.Bytes;
@@ -77,8 +79,6 @@ import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.util.StringUtils;
import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
@@ -108,21 +108,24 @@ import com.google.common.collect.Lists;
@InterfaceAudience.Private
public class HStore extends SchemaConfigured implements Store {
static final Log LOG = LogFactory.getLog(HStore.class);
+
+ /** Parameter name for what compaction manager to use. */
+ private static final String COMPACTION_MANAGER_CLASS = "hbase.compactionmanager.class";
+
+ /** Default compaction manager class name. */
+ private static final String DEFAULT_COMPACTION_MANAGER_CLASS = CompactionManager.class.getName();
protected final MemStore memstore;
// This stores directory in the filesystem.
private final Path homedir;
private final HRegion region;
private final HColumnDescriptor family;
+ CompactionManager compactionManager;
final FileSystem fs;
final Configuration conf;
final CacheConfig cacheConf;
- // ttl in milliseconds.
+ // ttl in milliseconds. TODO: can this be removed? Already stored in scanInfo.
private long ttl;
- private final int minFilesToCompact;
- private final int maxFilesToCompact;
- private final long minCompactSize;
- private final long maxCompactSize;
private long lastCompactSize = 0;
volatile boolean forceMajor = false;
/* how many bytes to write between status checks */
@@ -197,7 +200,7 @@ public class HStore extends SchemaConfigured implements Store {
this.comparator = info.getComparator();
// Get TTL
- this.ttl = getTTL(family);
+ this.ttl = determineTTLFromFamily(family);
// used by ScanQueryMatcher
long timeToPurgeDeletes =
Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
@@ -208,23 +211,11 @@ public class HStore extends SchemaConfigured implements Store {
scanInfo = new ScanInfo(family, ttl, timeToPurgeDeletes, this.comparator);
this.memstore = new MemStore(conf, this.comparator);
- // By default, compact if storefile.count >= minFilesToCompact
- this.minFilesToCompact = Math.max(2,
- conf.getInt("hbase.hstore.compaction.min",
- /*old name*/ conf.getInt("hbase.hstore.compactionThreshold", 3)));
- LOG.info("hbase.hstore.compaction.min = " + this.minFilesToCompact);
-
// Setting up cache configuration for this family
this.cacheConf = new CacheConfig(conf, family);
this.blockingStoreFileCount =
conf.getInt("hbase.hstore.blockingStoreFiles", 7);
- this.maxFilesToCompact = conf.getInt("hbase.hstore.compaction.max", 10);
- this.minCompactSize = conf.getLong("hbase.hstore.compaction.min.size",
- this.region.memstoreFlushSize);
- this.maxCompactSize
- = conf.getLong("hbase.hstore.compaction.max.size", Long.MAX_VALUE);
-
this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false);
if (HStore.closeCheckInterval == 0) {
@@ -239,13 +230,53 @@ public class HStore extends SchemaConfigured implements Store {
this.bytesPerChecksum = getBytesPerChecksum(conf);
// Create a compaction tool instance
this.compactor = new Compactor(this.conf);
+
+ setCompactionPolicy(conf.get(COMPACTION_MANAGER_CLASS, DEFAULT_COMPACTION_MANAGER_CLASS));
}
+ /**
+ * This setter is used for unit testing
+ * TODO: Fix this for online configuration updating
+ */
+ void setCompactionPolicy(String managerClassName) {
+ try {
+ Class extends CompactionManager> managerClass =
+ (Class extends CompactionManager>) Class.forName(managerClassName);
+ compactionManager = managerClass.getDeclaredConstructor(
+ new Class[] {Configuration.class, Store.class } ).newInstance(
+ new Object[] { conf, this } );
+ } catch (ClassNotFoundException e) {
+ throw new UnsupportedOperationException(
+ "Unable to find region server interface " + managerClassName, e);
+ } catch (IllegalAccessException e) {
+ throw new UnsupportedOperationException(
+ "Unable to access specified class " + managerClassName, e);
+ } catch (InstantiationException e) {
+ throw new UnsupportedOperationException(
+ "Unable to instantiate specified class " + managerClassName, e);
+ } catch (InvocationTargetException e) {
+ throw new UnsupportedOperationException(
+ "Unable to invoke specified target class constructor " + managerClassName, e);
+ } catch (NoSuchMethodException e) {
+ throw new UnsupportedOperationException(
+ "Unable to find suitable constructor for class " + managerClassName, e);
+ }
+ }
+
+ @Override
+ public Integer getDeterministicRandomSeed() {
+ ImmutableList snapshot = storefiles;
+ if (snapshot != null && !snapshot.isEmpty()) {
+ return snapshot.get(0).getPath().getName().hashCode();
+ }
+ return null;
+ }
+
/**
* @param family
* @return
*/
- long getTTL(final HColumnDescriptor family) {
+ private static long determineTTLFromFamily(final HColumnDescriptor family) {
// HCD.getTimeToLive returns ttl in seconds. Convert to milliseconds.
long ttl = family.getTimeToLive();
if (ttl == HConstants.FOREVER) {
@@ -280,6 +311,11 @@ public class HStore extends SchemaConfigured implements Store {
return this.fs;
}
+ public long getTtl() {
+ // TTL only applies if there's no MIN_VERSIONs setting on the column.
+ return (this.scanInfo.getMinVersions() == 0) ? this.ttl : Long.MAX_VALUE;
+ }
+
/**
* Returns the configured bytesPerChecksum value.
* @param conf The configuration
@@ -771,8 +807,11 @@ public class HStore extends SchemaConfigured implements Store {
} while (hasMore);
} finally {
// Write out the log sequence number that corresponds to this output
- // hfile. The hfile is current up to and including logCacheFlushId.
+ // hfile. Also write current time in metadata as minFlushTime.
+ // The hfile is current up to and including logCacheFlushId.
status.setStatus("Flushing " + this + ": appending metadata");
+ writer.appendFileInfo(StoreFile.MIN_FLUSH_TIME,
+ Bytes.toBytes(EnvironmentEdgeManager.currentTimeMillis()));
writer.appendMetadata(logCacheFlushId, false);
status.setStatus("Flushing " + this + ": closing flushed file");
writer.close();
@@ -1014,12 +1053,12 @@ public class HStore extends SchemaConfigured implements Store {
// Ready to go. Have list of files to compact.
LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
- + this + " of "
- + this.region.getRegionInfo().getRegionNameAsString()
+ + this + " of " + this.region.getRegionInfo().getRegionNameAsString()
+ " into tmpdir=" + region.getTmpDir() + ", seqid=" + maxId + ", totalSize="
+ StringUtils.humanReadableInt(cr.getSize()));
StoreFile sf = null;
+ long compactionStartTime = EnvironmentEdgeManager.currentTimeMillis();
try {
StoreFile.Writer writer =
this.compactor.compact(this, filesToCompact, cr.isMajor(), maxId);
@@ -1048,8 +1087,11 @@ public class HStore extends SchemaConfigured implements Store {
(sf == null ? "none" : sf.getPath().getName()) +
", size=" + (sf == null ? "none" :
StringUtils.humanReadableInt(sf.getReader().length()))
- + "; total size for store is "
- + StringUtils.humanReadableInt(storeSize));
+ + "; total size for store is " + StringUtils.humanReadableInt(storeSize)
+ + ". This selection was in queue for "
+ + StringUtils.formatTimeDiff(compactionStartTime, cr.getSelectionTime()) + ", and took "
+ + StringUtils.formatTimeDiff(EnvironmentEdgeManager.currentTimeMillis(), compactionStartTime)
+ + " to execute.");
return sf;
}
@@ -1107,11 +1149,8 @@ public class HStore extends SchemaConfigured implements Store {
return hasReferences(this.storefiles);
}
- /*
- * @param files
- * @return True if any of the files in files
are References.
- */
- private boolean hasReferences(Collection files) {
+ @Override
+ public boolean hasReferences(Collection files) {
if (files != null && files.size() > 0) {
for (StoreFile hsf: files) {
if (hsf.isReference()) {
@@ -1122,22 +1161,6 @@ public class HStore extends SchemaConfigured implements Store {
return false;
}
- /*
- * Gets lowest timestamp from candidate StoreFiles
- *
- * @param fs
- * @param dir
- * @throws IOException
- */
- public static long getLowestTimestamp(final List candidates)
- throws IOException {
- long minTs = Long.MAX_VALUE;
- for (StoreFile storeFile : candidates) {
- minTs = Math.min(minTs, storeFile.getModificationTimeStamp());
- }
- return minTs;
- }
-
@Override
public CompactionProgress getCompactionProgress() {
return this.compactor.getProgress();
@@ -1153,91 +1176,7 @@ public class HStore extends SchemaConfigured implements Store {
}
List candidates = new ArrayList(this.storefiles);
-
- // exclude files above the max compaction threshold
- // except: save all references. we MUST compact them
- int pos = 0;
- while (pos < candidates.size() &&
- candidates.get(pos).getReader().length() > this.maxCompactSize &&
- !candidates.get(pos).isReference()) ++pos;
- candidates.subList(0, pos).clear();
-
- return isMajorCompaction(candidates);
- }
-
- /*
- * @param filesToCompact Files to compact. Can be null.
- * @return True if we should run a major compaction.
- */
- private boolean isMajorCompaction(final List filesToCompact) throws IOException {
- boolean result = false;
- long mcTime = getNextMajorCompactTime();
- if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) {
- return result;
- }
- // TODO: Use better method for determining stamp of last major (HBASE-2990)
- long lowTimestamp = getLowestTimestamp(filesToCompact);
- long now = System.currentTimeMillis();
- if (lowTimestamp > 0l && lowTimestamp < (now - mcTime)) {
- // Major compaction time has elapsed.
- if (filesToCompact.size() == 1) {
- // Single file
- StoreFile sf = filesToCompact.get(0);
- long oldest =
- (sf.getReader().timeRangeTracker == null) ?
- Long.MIN_VALUE :
- now - sf.getReader().timeRangeTracker.minimumTimestamp;
- if (sf.isMajorCompaction() &&
- (this.ttl == HConstants.FOREVER || oldest < this.ttl)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Skipping major compaction of " + this +
- " because one (major) compacted file only and oldestTime " +
- oldest + "ms is < ttl=" + this.ttl);
- }
- } else if (this.ttl != HConstants.FOREVER && oldest > this.ttl) {
- LOG.debug("Major compaction triggered on store " + this +
- ", because keyvalues outdated; time since last major compaction " +
- (now - lowTimestamp) + "ms");
- result = true;
- }
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Major compaction triggered on store " + this +
- "; time since last major compaction " + (now - lowTimestamp) + "ms");
- }
- result = true;
- }
- }
- return result;
- }
-
- long getNextMajorCompactTime() {
- // default = 24hrs
- long ret = conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24);
- if (family.getValue(HConstants.MAJOR_COMPACTION_PERIOD) != null) {
- String strCompactionTime =
- family.getValue(HConstants.MAJOR_COMPACTION_PERIOD);
- ret = (new Long(strCompactionTime)).longValue();
- }
-
- if (ret > 0) {
- // default = 20% = +/- 4.8 hrs
- double jitterPct = conf.getFloat("hbase.hregion.majorcompaction.jitter",
- 0.20F);
- if (jitterPct > 0) {
- long jitter = Math.round(ret * jitterPct);
- // deterministic jitter avoids a major compaction storm on restart
- ImmutableList snapshot = storefiles;
- if (snapshot != null && !snapshot.isEmpty()) {
- String seed = snapshot.get(0).getPath().getName();
- double curRand = new Random(seed.hashCode()).nextDouble();
- ret += jitter - Math.round(2L * jitter * curRand);
- } else {
- ret = 0; // no storefiles == no major compaction
- }
- }
- }
- return ret;
+ return compactionManager.isMajorCompaction(candidates);
}
public CompactionRequest requestCompaction() throws IOException {
@@ -1273,9 +1212,10 @@ public class HStore extends SchemaConfigured implements Store {
CompactSelection filesToCompact;
if (override) {
// coprocessor is overriding normal file selection
- filesToCompact = new CompactSelection(conf, candidates);
+ filesToCompact = new CompactSelection(candidates);
} else {
- filesToCompact = compactSelection(candidates, priority);
+ filesToCompact = compactionManager.selectCompaction(candidates, priority,
+ forceMajor && filesCompacting.isEmpty());
}
if (region.getCoprocessorHost() != null) {
@@ -1325,191 +1265,6 @@ public class HStore extends SchemaConfigured implements Store {
}
}
- /**
- * Algorithm to choose which files to compact, see {@link #compactSelection(java.util.List, int)}
- * @param candidates
- * @return
- * @throws IOException
- */
- CompactSelection compactSelection(List candidates) throws IOException {
- return compactSelection(candidates,Store.NO_PRIORITY);
- }
-
- /**
- * Algorithm to choose which files to compact
- *
- * Configuration knobs:
- * "hbase.hstore.compaction.ratio"
- * normal case: minor compact when file <= sum(smaller_files) * ratio
- * "hbase.hstore.compaction.min.size"
- * unconditionally compact individual files below this size
- * "hbase.hstore.compaction.max.size"
- * never compact individual files above this size (unless splitting)
- * "hbase.hstore.compaction.min"
- * min files needed to minor compact
- * "hbase.hstore.compaction.max"
- * max files to compact at once (avoids OOM)
- *
- * @param candidates candidate files, ordered from oldest to newest
- * @return subset copy of candidate list that meets compaction criteria
- * @throws IOException
- */
- CompactSelection compactSelection(List candidates, int priority)
- throws IOException {
- // ASSUMPTION!!! filesCompacting is locked when calling this function
-
- /* normal skew:
- *
- * older ----> newer
- * _
- * | | _
- * | | | | _
- * --|-|- |-|- |-|---_-------_------- minCompactSize
- * | | | | | | | | _ | |
- * | | | | | | | | | | | |
- * | | | | | | | | | | | |
- */
- CompactSelection compactSelection = new CompactSelection(conf, candidates);
-
- boolean forcemajor = this.forceMajor && filesCompacting.isEmpty();
- if (!forcemajor) {
- // Delete the expired store files before the compaction selection.
- if (conf.getBoolean("hbase.store.delete.expired.storefile", true)
- && (ttl != Long.MAX_VALUE) && (this.scanInfo.minVersions == 0)) {
- CompactSelection expiredSelection = compactSelection
- .selectExpiredStoreFilesToCompact(
- EnvironmentEdgeManager.currentTimeMillis() - this.ttl);
-
- // If there is any expired store files, delete them by compaction.
- if (expiredSelection != null) {
- return expiredSelection;
- }
- }
- // do not compact old files above a configurable threshold
- // save all references. we MUST compact them
- int pos = 0;
- while (pos < compactSelection.getFilesToCompact().size() &&
- compactSelection.getFilesToCompact().get(pos).getReader().length()
- > maxCompactSize &&
- !compactSelection.getFilesToCompact().get(pos).isReference()) ++pos;
- if (pos != 0) compactSelection.clearSubList(0, pos);
- }
-
- if (compactSelection.getFilesToCompact().isEmpty()) {
- LOG.debug(this.getHRegionInfo().getEncodedName() + " - " +
- this + ": no store files to compact");
- compactSelection.emptyFileList();
- return compactSelection;
- }
-
- // Force a major compaction if this is a user-requested major compaction,
- // or if we do not have too many files to compact and this was requested
- // as a major compaction
- boolean majorcompaction = (forcemajor && priority == Store.PRIORITY_USER) ||
- (forcemajor || isMajorCompaction(compactSelection.getFilesToCompact())) &&
- (compactSelection.getFilesToCompact().size() < this.maxFilesToCompact
- );
- LOG.debug(this.getHRegionInfo().getEncodedName() + " - " +
- this.getColumnFamilyName() + ": Initiating " +
- (majorcompaction ? "major" : "minor") + "compaction");
-
- if (!majorcompaction &&
- !hasReferences(compactSelection.getFilesToCompact())) {
- // we're doing a minor compaction, let's see what files are applicable
- int start = 0;
- double r = compactSelection.getCompactSelectionRatio();
-
- // remove bulk import files that request to be excluded from minors
- compactSelection.getFilesToCompact().removeAll(Collections2.filter(
- compactSelection.getFilesToCompact(),
- new Predicate() {
- public boolean apply(StoreFile input) {
- return input.excludeFromMinorCompaction();
- }
- }));
-
- // skip selection algorithm if we don't have enough files
- if (compactSelection.getFilesToCompact().size() < this.minFilesToCompact) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Not compacting files because we only have " +
- compactSelection.getFilesToCompact().size() +
- " files ready for compaction. Need " + this.minFilesToCompact + " to initiate.");
- }
- compactSelection.emptyFileList();
- return compactSelection;
- }
-
- /* TODO: add sorting + unit test back in when HBASE-2856 is fixed
- // Sort files by size to correct when normal skew is altered by bulk load.
- Collections.sort(filesToCompact, StoreFile.Comparators.FILE_SIZE);
- */
-
- // get store file sizes for incremental compacting selection.
- int countOfFiles = compactSelection.getFilesToCompact().size();
- long [] fileSizes = new long[countOfFiles];
- long [] sumSize = new long[countOfFiles];
- for (int i = countOfFiles-1; i >= 0; --i) {
- StoreFile file = compactSelection.getFilesToCompact().get(i);
- fileSizes[i] = file.getReader().length();
- // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
- int tooFar = i + this.maxFilesToCompact - 1;
- sumSize[i] = fileSizes[i]
- + ((i+1 < countOfFiles) ? sumSize[i+1] : 0)
- - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
- }
-
- /* Start at the oldest file and stop when you find the first file that
- * meets compaction criteria:
- * (1) a recently-flushed, small file (i.e. <= minCompactSize)
- * OR
- * (2) within the compactRatio of sum(newer_files)
- * Given normal skew, any newer files will also meet this criteria
- *
- * Additional Note:
- * If fileSizes.size() >> maxFilesToCompact, we will recurse on
- * compact(). Consider the oldest files first to avoid a
- * situation where we always compact [end-threshold,end). Then, the
- * last file becomes an aggregate of the previous compactions.
- */
- while(countOfFiles - start >= this.minFilesToCompact &&
- fileSizes[start] >
- Math.max(minCompactSize, (long)(sumSize[start+1] * r))) {
- ++start;
- }
- int end = Math.min(countOfFiles, start + this.maxFilesToCompact);
- long totalSize = fileSizes[start]
- + ((start+1 < countOfFiles) ? sumSize[start+1] : 0);
- compactSelection = compactSelection.getSubList(start, end);
-
- // if we don't have enough files to compact, just wait
- if (compactSelection.getFilesToCompact().size() < this.minFilesToCompact) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Skipped compaction of " + this
- + ". Only " + (end - start) + " file(s) of size "
- + StringUtils.humanReadableInt(totalSize)
- + " have met compaction criteria.");
- }
- compactSelection.emptyFileList();
- return compactSelection;
- }
- } else {
- if(majorcompaction) {
- if (compactSelection.getFilesToCompact().size() > this.maxFilesToCompact) {
- LOG.debug("Warning, compacting more than " + this.maxFilesToCompact +
- " files, probably because of a user-requested major compaction");
- if(priority != Store.PRIORITY_USER) {
- LOG.error("Compacting more than max files on a non user-requested compaction");
- }
- }
- } else if (compactSelection.getFilesToCompact().size() > this.maxFilesToCompact) {
- // all files included in this compaction, up to max
- int pastMax = compactSelection.getFilesToCompact().size() - this.maxFilesToCompact;
- compactSelection.getFilesToCompact().subList(0, pastMax).clear();
- }
- }
- return compactSelection;
- }
-
/**
* Validates a store file by opening and closing it. In HFileV2 this should
* not be an expensive operation.
@@ -2017,11 +1772,7 @@ public class HStore extends SchemaConfigured implements Store {
@Override
public boolean throttleCompaction(long compactionSize) {
- // see HBASE-5867 for discussion on the default
- long throttlePoint = conf.getLong(
- "hbase.regionserver.thread.compaction.throttle",
- 2 * this.minFilesToCompact * this.region.memstoreFlushSize);
- return compactionSize > throttlePoint;
+ return compactionManager.throttleCompaction(compactionSize);
}
@Override
@@ -2116,7 +1867,7 @@ public class HStore extends SchemaConfigured implements Store {
@Override
public boolean needsCompaction() {
- return (storefiles.size() - filesCompacting.size()) > minFilesToCompact;
+ return compactionManager.needsCompaction(storefiles.size() - filesCompacting.size());
}
@Override
@@ -2126,8 +1877,8 @@ public class HStore extends SchemaConfigured implements Store {
public static final long FIXED_OVERHEAD =
ClassSize.align(SchemaConfigured.SCHEMA_CONFIGURED_UNALIGNED_HEAP_SIZE +
- + (17 * ClassSize.REFERENCE) + (6 * Bytes.SIZEOF_LONG)
- + (5 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
+ + (18 * ClassSize.REFERENCE) + (4 * Bytes.SIZEOF_LONG)
+ + (3 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
+ ClassSize.OBJECT + ClassSize.REENTRANT_LOCK
@@ -2148,6 +1899,15 @@ public class HStore extends SchemaConfigured implements Store {
return scanInfo;
}
+ /**
+ * Refreshes compaction manager class configuration.
+ * Used for tests only - not plumbed thru any layers.
+ * TODO: replace when HBASE-3909 is in.
+ */
+ void updateConfiguration() {
+ setCompactionPolicy(conf.get(COMPACTION_MANAGER_CLASS, DEFAULT_COMPACTION_MANAGER_CLASS));
+ }
+
/**
* Immutable information for scans over a store.
*/
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
index d391a16df74..54d6390c64f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Collection;
import java.util.List;
import java.util.NavigableSet;
@@ -204,6 +206,12 @@ public interface Store extends SchemaAware, HeapSize {
* @return true if the store has any underlying reference files to older HFiles
*/
public boolean hasReferences();
+
+ /*
+ * @param files
+ * @return True if any of the files in files
are References.
+ */
+ public boolean hasReferences(Collection files);
/**
* @return The size of this store's memstore, in bytes
@@ -267,6 +275,11 @@ public interface Store extends SchemaAware, HeapSize {
* @return the total size of all Bloom filters in the store
*/
public long getTotalStaticBloomSize();
+
+ /**
+ * Returns the TTL for this store's column family.
+ */
+ public long getTtl();
// Test-helper methods
@@ -287,4 +300,10 @@ public interface Store extends SchemaAware, HeapSize {
* @return the parent region hosting this store
*/
public HRegion getHRegion();
+
+ /**
+ * @return A hash code depending on the state of the current store files.
+ * This is used as seed for deterministic random generator for selecting major compaction time
+ */
+ public Integer getDeterministicRandomSeed();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
index af1225a1481..6fc0beb8cf5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
@@ -67,6 +67,7 @@ import org.apache.hadoop.hbase.util.BloomFilter;
import org.apache.hadoop.hbase.util.BloomFilterFactory;
import org.apache.hadoop.hbase.util.BloomFilterWriter;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.RawComparator;
@@ -114,6 +115,9 @@ public class StoreFile extends SchemaConfigured {
/** Max Sequence ID in FileInfo */
public static final byte [] MAX_SEQ_ID_KEY = Bytes.toBytes("MAX_SEQ_ID_KEY");
+ /** Min Flush time in FileInfo */
+ public static final byte [] MIN_FLUSH_TIME = Bytes.toBytes("MIN_FLUSH_TIME");
+
/** Major compaction flag in FileInfo */
public static final byte[] MAJOR_COMPACTION_KEY =
Bytes.toBytes("MAJOR_COMPACTION_KEY");
@@ -143,6 +147,9 @@ public class StoreFile extends SchemaConfigured {
// Need to make it 8k for testing.
public static final int DEFAULT_BLOCKSIZE_SMALL = 8 * 1024;
+ /** Default value for files without minFlushTime in metadata */
+ public static final long NO_MIN_FLUSH_TIME = -1;
+
private final FileSystem fs;
// This file's path.
@@ -169,6 +176,8 @@ public class StoreFile extends SchemaConfigured {
// Keys for metadata stored in backing HFile.
// Set when we obtain a Reader.
private long sequenceid = -1;
+ // default value is -1, remains -1 if file written without minFlushTime
+ private long minFlushTime = NO_MIN_FLUSH_TIME;
// max of the MemstoreTS in the KV's in this store
// Set when we obtain a Reader.
@@ -381,6 +390,22 @@ public class StoreFile extends SchemaConfigured {
return this.sequenceid;
}
+ public boolean hasMinFlushTime() {
+ return this.minFlushTime != NO_MIN_FLUSH_TIME;
+ }
+
+ public long getMinFlushTime() {
+ // BulkLoad files are assumed to contain very old data, return 0
+ if (isBulkLoadResult() && getMaxSequenceId() <= 0) {
+ return 0;
+ } else if (this.minFlushTime == NO_MIN_FLUSH_TIME) {
+ // File written without minFlushTime field assume recent data
+ return EnvironmentEdgeManager.currentTimeMillis();
+ } else {
+ return this.minFlushTime;
+ }
+ }
+
public long getModificationTimeStamp() {
return modificationTimeStamp;
}
@@ -587,7 +612,10 @@ public class StoreFile extends SchemaConfigured {
}
}
}
-
+ b = metadataMap.get(MIN_FLUSH_TIME);
+ if (b != null) {
+ this.minFlushTime = Bytes.toLong(b);
+ }
this.reader.setSequenceID(this.sequenceid);
b = metadataMap.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TierCompactionConfiguration.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TierCompactionConfiguration.java
new file mode 100644
index 00000000000..725aa5c0d59
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TierCompactionConfiguration.java
@@ -0,0 +1,267 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
+
+import java.text.DecimalFormat;
+
+/**
+ * Control knobs for default compaction algorithm
+ */
+@InterfaceAudience.Private
+public class TierCompactionConfiguration extends CompactionConfiguration {
+
+ private CompactionTier[] compactionTier;
+ private boolean recentFirstOrder;
+
+ TierCompactionConfiguration(Configuration conf, Store store) {
+ super(conf, store);
+
+ String strPrefix = "hbase.hstore.compaction.";
+ String strSchema = "tbl." + store.getHRegion().getTableDesc().getNameAsString()
+ + "cf." + store.getFamily().getNameAsString() + ".";
+ String strDefault = "Default.";
+ String strAttribute;
+ // If value not set for family, use default family (by passing null).
+ // If default value not set, use 1 tier.
+
+ strAttribute = "NumCompactionTiers";
+ compactionTier = new CompactionTier[
+ conf.getInt(strPrefix + strSchema + strAttribute,
+ conf.getInt(strPrefix + strDefault + strAttribute,
+ 1))];
+
+ strAttribute = "IsRecentFirstOrder";
+ recentFirstOrder =
+ conf.getBoolean(strPrefix + strSchema + strAttribute,
+ conf.getBoolean(strPrefix + strDefault + strAttribute,
+ true));
+
+ strAttribute = "MinCompactSize";
+ minCompactSize =
+ conf.getLong(strPrefix + strSchema + strAttribute,
+ conf.getLong(strPrefix + strDefault + strAttribute,
+ 0));
+
+ strAttribute = "MaxCompactSize";
+ maxCompactSize =
+ conf.getLong(strPrefix + strSchema + strAttribute,
+ conf.getLong(strPrefix + strDefault + strAttribute,
+ Long.MAX_VALUE));
+
+ strAttribute = "ShouldDeleteExpired";
+ shouldDeleteExpired =
+ conf.getBoolean(strPrefix + strSchema + strAttribute,
+ conf.getBoolean(strPrefix + strDefault + strAttribute,
+ shouldDeleteExpired));
+
+ strAttribute = "ThrottlePoint";
+ throttlePoint =
+ conf.getLong(strPrefix + strSchema + strAttribute,
+ conf.getLong(strPrefix + strDefault + strAttribute,
+ throttlePoint));
+
+ strAttribute = "MajorCompactionPeriod";
+ majorCompactionPeriod =
+ conf.getLong(strPrefix + strSchema + strAttribute,
+ conf.getLong(strPrefix + strDefault + strAttribute,
+ majorCompactionPeriod));
+
+ strAttribute = "MajorCompactionJitter";
+ majorCompactionJitter =
+ conf.getFloat(
+ strPrefix + strSchema + strAttribute,
+ conf.getFloat(
+ strPrefix + strDefault + strAttribute,
+ majorCompactionJitter
+ )
+ );
+
+ for (int i = 0; i < compactionTier.length; i++) {
+ compactionTier[i] = new CompactionTier(i);
+ }
+ }
+ /**
+ * @return Number of compaction Tiers
+ */
+ int getNumCompactionTiers() {
+ return compactionTier.length;
+ }
+
+ /**
+ * @return The i-th tier from most recent
+ */
+ CompactionTier getCompactionTier(int i) {
+ return compactionTier[i];
+ }
+
+ /**
+ * @return Whether the tiers will be checked for compaction from newest to oldest
+ */
+ boolean isRecentFirstOrder() {
+ return recentFirstOrder;
+ }
+
+ /**
+ * Parameters for each tier
+ */
+ class CompactionTier {
+
+ private long maxAgeInDisk;
+ private long maxSize;
+ private double tierCompactionRatio;
+ private int tierMinFilesToCompact;
+ private int tierMaxFilesToCompact;
+ private int endingIndexForTier;
+
+ CompactionTier(int tier) {
+ String strPrefix = "hbase.hstore.compaction.";
+ String strSchema = "tbl." + store.getHRegion().getTableDesc().getNameAsString()
+ + "cf." + store.getFamily().getNameAsString() + ".";
+ String strDefault = "Default.";
+ String strDefTier = "";
+ String strTier = "Tier." + String.valueOf(tier) + ".";
+ String strAttribute;
+
+ /**
+ * Use value set for current family, current tier
+ * If not set, use value set for current family, default tier
+ * if not set, use value set for Default family, current tier
+ * If not set, use value set for Default family, default tier
+ * Else just use a default value
+ */
+
+ strAttribute = "MaxAgeInDisk";
+ maxAgeInDisk =
+ conf.getLong(strPrefix + strSchema + strTier + strAttribute,
+ conf.getLong(strPrefix + strDefault + strTier + strAttribute,
+ Long.MAX_VALUE));
+
+ strAttribute = "MaxSize";
+ maxSize =
+ conf.getLong(strPrefix + strSchema + strTier + strAttribute,
+ conf.getLong(strPrefix + strDefault + strTier + strAttribute,
+ Long.MAX_VALUE));
+
+ strAttribute = "CompactionRatio";
+ tierCompactionRatio = (double)
+ conf.getFloat(strPrefix + strSchema + strTier + strAttribute,
+ conf.getFloat(strPrefix + strSchema + strDefTier + strAttribute,
+ conf.getFloat(strPrefix + strDefault + strTier + strAttribute,
+ conf.getFloat(strPrefix + strDefault + strDefTier + strAttribute,
+ (float) compactionRatio))));
+
+ strAttribute = "MinFilesToCompact";
+ tierMinFilesToCompact =
+ conf.getInt(strPrefix + strSchema + strTier + strAttribute,
+ conf.getInt(strPrefix + strSchema + strDefTier + strAttribute,
+ conf.getInt(strPrefix + strDefault + strTier + strAttribute,
+ conf.getInt(strPrefix + strDefault + strDefTier + strAttribute,
+ minFilesToCompact))));
+
+ strAttribute = "MaxFilesToCompact";
+ tierMaxFilesToCompact =
+ conf.getInt(strPrefix + strSchema + strTier + strAttribute,
+ conf.getInt(strPrefix + strSchema + strDefTier + strAttribute,
+ conf.getInt(strPrefix + strDefault + strTier + strAttribute,
+ conf.getInt(strPrefix + strDefault + strDefTier + strAttribute,
+ maxFilesToCompact))));
+
+ strAttribute = "EndingIndexForTier";
+ endingIndexForTier =
+ conf.getInt(strPrefix + strSchema + strTier + strAttribute,
+ conf.getInt(strPrefix + strDefault + strTier + strAttribute,
+ tier));
+
+ //make sure this value is not incorrectly set
+ if (endingIndexForTier < 0 || endingIndexForTier > tier) {
+ LOG.error("EndingIndexForTier improperly set. Using default value.");
+ endingIndexForTier = tier;
+ }
+
+ }
+
+ /**
+ * @return Upper bound on storeFile's minFlushTime to be included in this tier
+ */
+ long getMaxAgeInDisk() {
+ return maxAgeInDisk;
+ }
+
+ /**
+ * @return Upper bound on storeFile's size to be included in this tier
+ */
+ long getMaxSize() {
+ return maxSize;
+ }
+
+ /**
+ * @return Compaction ratio for selections of this tier
+ */
+ double getCompactionRatio() {
+ return tierCompactionRatio;
+ }
+
+ /**
+ * @return lower bound on number of files in selections of this tier
+ */
+ int getMinFilesToCompact() {
+ return tierMinFilesToCompact;
+ }
+
+ /**
+ * @return upper bound on number of files in selections of this tier
+ */
+ int getMaxFilesToCompact() {
+ return tierMaxFilesToCompact;
+ }
+
+ /**
+ * @return the newest tier which will also be included in selections of this tier
+ * by default it is the index of this tier, must be between 0 and this tier
+ */
+ int getEndingIndexForTier() {
+ return endingIndexForTier;
+ }
+
+ String getDescription() {
+ String ageString = "INF";
+ String sizeString = "INF";
+ if (getMaxAgeInDisk() < Long.MAX_VALUE) {
+ ageString = StringUtils.formatTime(getMaxAgeInDisk());
+ }
+ if (getMaxSize() < Long.MAX_VALUE) {
+ ageString = StringUtils.humanReadableInt(getMaxSize());
+ }
+ String ret = "Has files upto age " + ageString
+ + " and upto size " + sizeString + ". "
+ + "Compaction ratio: " + (new DecimalFormat("#.##")).format(getCompactionRatio()) + ", "
+ + "Compaction Selection with at least " + getMinFilesToCompact() + " and "
+ + "at most " + getMaxFilesToCompact() + " files possible, "
+ + "Selections in this tier includes files up to tier " + getEndingIndexForTier();
+ return ret;
+ }
+
+ }
+
+}
\ No newline at end of file
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TierCompactionManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TierCompactionManager.java
new file mode 100644
index 00000000000..bd9130ab770
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TierCompactionManager.java
@@ -0,0 +1,256 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.util.StringUtils;
+
+import java.io.IOException;
+
+@InterfaceAudience.Private
+public class TierCompactionManager extends CompactionManager {
+
+ private static final Log LOG = LogFactory.getLog(TierCompactionManager.class);
+
+ private int[] endInTier;
+ private int[] tierOf;
+
+ private TierCompactionConfiguration tierConf;
+
+ TierCompactionManager(Configuration configuration, Store store) {
+ super(configuration, store);
+ comConf = new TierCompactionConfiguration(configuration, store);
+ tierConf = (TierCompactionConfiguration) comConf;
+ }
+
+ /**
+ * @param candidates pre-filtrate
+ * @return filtered subset
+ * -- Tier Based minor compaction selection algorithm: Choose CompactSelection from candidates --
+ *
+ * First exclude bulk-load files if indicated in configuration.
+ * Arrange files from oldest to newest then select an appropriate ['start','end') pair
+ * try 'start' from oldest to newest (smallest to largest fileIndex)
+ * for each value, identify the 'end' fileIndex
+ * stop when the range ['start','end') is an admissible compaction
+ *
+ * Notes:
+ *
+ * a compaction is admissible if
+ * - file fileSize[start] is at most maxCompactSize AND
+ * - number of files is at least currentTier.minFilesToCompact AND
+ * - (fileSize[start] is at most ratio times the rest of the files in the compaction OR
+ * - fileSize[start] is at most minCompactSize)
+ *
+ * end is endInTier[tierOf[start].endingInclusionTier]
+ * By default currentTier.endingIndexForTier = currentTier, so in the default
+ * case 'end' is always 1 + the last fileIndex in currentTier, making sure
+ * files from different tiers are never selected together in the default case
+ * normal skew:
+ *
+ * older ----> newer (increasing seqID, increasing minFlushTime)
+ *
+ * Tier 2 | Tier 1 | Tier 0
+ * | |
+ * _ | |
+ * | | | _ |
+ * | | | | | _ |
+ * --|-|-|-|-|- |-|-|--_-------_------- minCompactSize
+ * | | | | | | | | | | _ | |
+ * | | | | | | | | | | | | | |
+ * | | | | | | | | | | | | | |
+ */
+ @Override
+ CompactSelection applyCompactionPolicy(CompactSelection candidates) throws IOException {
+ // we're doing a minor compaction, let's see what files are applicable
+ int start = -1;
+ int end = -1;
+
+ // skip selection algorithm if we don't have enough files
+ if (candidates.getFilesToCompact().isEmpty()) {
+ candidates.emptyFileList();
+ return candidates;
+ }
+
+ // get store file sizes for incremental compacting selection.
+ int countOfFiles = candidates.getFilesToCompact().size();
+ long[] fileSizes = new long[countOfFiles];
+ StoreFile file;
+ long[] sumSize = new long[countOfFiles + 1];
+ sumSize[countOfFiles] = 0;
+ for (int i = countOfFiles - 1; i >= 0; --i) {
+ file = candidates.getFilesToCompact().get(i);
+ fileSizes[i] = file.getReader().length();
+ // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
+ sumSize[i] = fileSizes[i] + sumSize[i + 1];
+ }
+
+ /**
+ * divide into tiers:
+ * assign tierOf[fileIndex] = tierIndex
+ * assign endInTier[tierIndex] = 1 + index of the last file in tierIndex
+ */
+ // Backward compatibility - if files with indices < i don't have minFlushTime field, then
+ // all of them get tierOf[i]. If no file has minFlushTime all gets tier zero.
+ int numTiers = tierConf.getNumCompactionTiers();
+ TierCompactionConfiguration.CompactionTier tier;
+ tierOf = new int[countOfFiles];
+ endInTier = new int[numTiers + 1];
+ endInTier[numTiers] = 0;
+
+ LOG.info("Applying TierCompactionPolicy with " + countOfFiles + " files");
+
+ int i;
+ int j = countOfFiles;
+
+ for (i = 0; i < numTiers; i++) {
+ tier = tierConf.getCompactionTier(i);
+ endInTier[i] = j;
+ while (j > 0) {
+ file = candidates.getFilesToCompact().get(j - 1);
+ if (!isInTier(file, tier)) {
+ break;
+ }
+ j--;
+ tierOf[j] = i;
+ }
+ }
+
+ long restSize;
+ double ratio;
+
+ //Main algorithm
+ for (j = 0; j < countOfFiles; j++) {
+ start = next(start);
+ tier = tierConf.getCompactionTier(tierOf[start]);
+ end = endInTier[tier.getEndingIndexForTier()];
+ restSize = sumSize[start + 1] - sumSize[end];
+ ratio = tier.getCompactionRatio();
+ if (fileSizes[start] <= tierConf.getMaxCompactSize() &&
+ end - start >= tier.getMinFilesToCompact() &&
+ (fileSizes[start] <= tierConf.getMinCompactSize() ||
+ (fileSizes[start] <= restSize * ratio))) {
+ break;
+ }
+ }
+ String tab = " ";
+ for (i = 0; i < numTiers; i++) {
+ LOG.info("Tier " + i + " : " + tierConf.getCompactionTier(i).getDescription());
+ if (endInTier[i] == endInTier[i+1]) {
+ LOG.info(tab + "No file is assigned to this tier.");
+ } else {
+ LOG.info(tab + (endInTier[i] - endInTier[i+1])
+ + " file(s) are assigned to this tier with serial number(s) "
+ + endInTier[i + 1] + " to " + (endInTier[i] - 1));
+ }
+ for (j = endInTier[i + 1]; j < endInTier[i]; j++) {
+ file = candidates.getFilesToCompact().get(j);
+ LOG.info(tab + tab + "SeqID = " + file.getMaxSequenceId()
+ + ", Age = " + StringUtils.formatTimeDiff(
+ EnvironmentEdgeManager.currentTimeMillis(), file.getMinFlushTime())
+ + ", Size = " + StringUtils.humanReadableInt(fileSizes[j])
+ + ", Path = " + file.getPath());
+ }
+ }
+ if (start < countOfFiles) {
+ end = Math.min(end, start
+ + tierConf.getCompactionTier(tierOf[start]).getMaxFilesToCompact());
+ }
+ if (start < end) {
+ String strTier = String.valueOf(tierOf[start]);
+ if (tierOf[end - 1] != tierOf[start]) {
+ strTier += " to " + tierOf[end - 1];
+ }
+ LOG.info("Tier Based compaction algorithm has selected " + (end - start)
+ + " files from tier " + strTier + " out of " + countOfFiles + " candidates");
+ }
+
+ candidates = candidates.getSubList(start, end);
+ return candidates;
+ }
+
+ private boolean isInTier(StoreFile file, TierCompactionConfiguration.CompactionTier tier) {
+ return file.getReader().length() <= tier.getMaxSize() &&
+ EnvironmentEdgeManager.currentTimeMillis()-file.getMinFlushTime() <= tier.getMaxAgeInDisk();
+ }
+
+ /**
+ * This function iterates over the start values in order.
+ * Whenever an admissible compaction is found, we return the selection.
+ * Hence the order is important if there are more than one admissible compaction.
+ * @param start current Value
+ * @return next Value
+ */
+ private int next(int start) {
+ if (tierConf.isRecentFirstOrder()) {
+ return backNext(start);
+ }
+ return fwdNext(start);
+ }
+
+ /**
+ * This function iterates over the start values in newer-first order of tiers,
+ * but older-first order of files within a tier.
+ * For example, suppose the tiers are:
+ * Tier 3 - files 0,1,2
+ * Tier 2 - files 3,4
+ * Tier 1 - no files
+ * Tier 0 - files 5,6,7
+ * Then the order of 'start' files will be:
+ * 5,6,7,3,4,0,1,2
+ * @param start current Value
+ * @return next Value
+ */
+ private int backNext(int start) {
+ int tier = 0;
+ if (start == -1) {
+ while (endInTier[tier] >= endInTier[0]) {
+ tier++;
+ }
+ return endInTier[tier];
+ }
+ tier = tierOf[start];
+ if (endInTier[tier] == start + 1) {
+ tier++;
+ start = endInTier[tier];
+ while (endInTier[tier] >= start) {
+ tier++;
+ }
+ return endInTier[tier];
+ }
+ return start + 1;
+ }
+
+ /**
+ * This function iterates over the start values in older-first order of files.
+ * @param start current Value
+ * @return next Value
+ */
+ private int fwdNext(int start) {
+ return start + 1;
+ }
+
+}
\ No newline at end of file
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactSelection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactSelection.java
index d240b87e08e..66d8a0f9e72 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactSelection.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactSelection.java
@@ -19,15 +19,13 @@
package org.apache.hadoop.hbase.regionserver.compactions;
import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.GregorianCalendar;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@InterfaceAudience.Private
public class CompactSelection {
@@ -48,37 +46,15 @@ public class CompactSelection {
*/
private final static Object compactionCountLock = new Object();
- // HBase conf object
- Configuration conf;
// was this compaction promoted to an off-peak
boolean isOffPeakCompaction = false;
- // compactRatio: double on purpose! Float.MAX < Long.MAX < Double.MAX
- // With float, java will downcast your long to float for comparisons (bad)
- private double compactRatio;
- // compaction ratio off-peak
- private double compactRatioOffPeak;
- // offpeak start time
- private int offPeakStartHour = -1;
- // off peak end time
- private int offPeakEndHour = -1;
+ // CompactSelection object creation time.
+ private final long selectionTime;
- public CompactSelection(Configuration conf, List filesToCompact) {
+ public CompactSelection(List filesToCompact) {
+ this.selectionTime = EnvironmentEdgeManager.currentTimeMillis();
this.filesToCompact = filesToCompact;
- this.conf = conf;
- this.compactRatio = conf.getFloat("hbase.hstore.compaction.ratio", 1.2F);
- this.compactRatioOffPeak = conf.getFloat("hbase.hstore.compaction.ratio.offpeak", 5.0F);
-
- // Peak time is from [offPeakStartHour, offPeakEndHour). Valid numbers are [0, 23]
- this.offPeakStartHour = conf.getInt("hbase.offpeak.start.hour", -1);
- this.offPeakEndHour = conf.getInt("hbase.offpeak.end.hour", -1);
- if (!isValidHour(this.offPeakStartHour) || !isValidHour(this.offPeakEndHour)) {
- if (!(this.offPeakStartHour == -1 && this.offPeakEndHour == -1)) {
- LOG.warn("Invalid start/end hour for peak hour : start = " +
- this.offPeakStartHour + " end = " + this.offPeakEndHour +
- ". Valid numbers are [0-23]");
- }
- this.offPeakStartHour = this.offPeakEndHour = -1;
- }
+ this.isOffPeakCompaction = false;
}
/**
@@ -113,49 +89,25 @@ public class CompactSelection {
}
if (hasExpiredStoreFiles) {
- expiredSFSelection = new CompactSelection(conf, expiredStoreFiles);
+ expiredSFSelection = new CompactSelection(expiredStoreFiles);
}
return expiredSFSelection;
}
- /**
- * If the current hour falls in the off peak times and there are no
- * outstanding off peak compactions, the current compaction is
- * promoted to an off peak compaction. Currently only one off peak
- * compaction is present in the compaction queue.
- *
- * @param currentHour
- * @return
- */
- public double getCompactSelectionRatio() {
- double r = this.compactRatio;
- synchronized(compactionCountLock) {
- if (isOffPeakHour() && numOutstandingOffPeakCompactions == 0) {
- r = this.compactRatioOffPeak;
- numOutstandingOffPeakCompactions++;
- isOffPeakCompaction = true;
- }
- }
- if(isOffPeakCompaction) {
- LOG.info("Running an off-peak compaction, selection ratio = " +
- compactRatioOffPeak + ", numOutstandingOffPeakCompactions is now " +
- numOutstandingOffPeakCompactions);
- }
- return r;
- }
-
/**
* The current compaction finished, so reset the off peak compactions count
* if this was an off peak compaction.
*/
public void finishRequest() {
if (isOffPeakCompaction) {
+ long newValueToLog = -1;
synchronized(compactionCountLock) {
- numOutstandingOffPeakCompactions--;
+ assert !isOffPeakCompaction : "Double-counting off-peak count for compaction";
+ newValueToLog = --numOutstandingOffPeakCompactions;
isOffPeakCompaction = false;
}
LOG.info("Compaction done, numOutstandingOffPeakCompactions is now " +
- numOutstandingOffPeakCompactions);
+ newValueToLog);
}
}
@@ -170,13 +122,14 @@ public class CompactSelection {
public void emptyFileList() {
filesToCompact.clear();
if (isOffPeakCompaction) {
+ long newValueToLog = -1;
synchronized(compactionCountLock) {
// reset the off peak count
- numOutstandingOffPeakCompactions--;
+ newValueToLog = --numOutstandingOffPeakCompactions;
isOffPeakCompaction = false;
}
LOG.info("Nothing to compact, numOutstandingOffPeakCompactions is now " +
- numOutstandingOffPeakCompactions);
+ newValueToLog);
}
}
@@ -184,16 +137,30 @@ public class CompactSelection {
return this.isOffPeakCompaction;
}
- private boolean isOffPeakHour() {
- int currentHour = (new GregorianCalendar()).get(Calendar.HOUR_OF_DAY);
- // If offpeak time checking is disabled just return false.
- if (this.offPeakStartHour == this.offPeakEndHour) {
- return false;
+ public static long getNumOutStandingOffPeakCompactions() {
+ synchronized(compactionCountLock) {
+ return numOutstandingOffPeakCompactions;
}
- if (this.offPeakStartHour < this.offPeakEndHour) {
- return (currentHour >= this.offPeakStartHour && currentHour < this.offPeakEndHour);
+ }
+
+ /**
+ * Tries making the compaction off-peak.
+ * Only checks internal compaction constraints, not timing.
+ * @return Eventual value of isOffPeakCompaction.
+ */
+ public boolean trySetOffpeak() {
+ assert !isOffPeakCompaction : "Double-setting off-peak for compaction " + this;
+ synchronized(compactionCountLock) {
+ if (numOutstandingOffPeakCompactions == 0) {
+ numOutstandingOffPeakCompactions++;
+ isOffPeakCompaction = true;
+ }
}
- return (currentHour >= this.offPeakStartHour || currentHour < this.offPeakEndHour);
+ return isOffPeakCompaction;
+ }
+
+ public long getSelectionTime() {
+ return selectionTime;
}
public CompactSelection subList(int start, int end) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
index bee966800ca..b98dec339dd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
@@ -208,6 +208,10 @@ public class CompactionRequest implements Comparable,
return p;
}
+ public long getSelectionTime() {
+ return compactSelection.getSelectionTime();
+ }
+
/** Gets the priority for the request */
public void setPriority(int p) {
this.p = p;
@@ -272,7 +276,7 @@ public class CompactionRequest implements Comparable,
server.checkFileSystem();
} finally {
s.finishRequest(this);
- LOG.debug("CompactSplitThread status: " + server.compactSplitThread);
+ LOG.debug("CompactSplitThread Status: " + server.compactSplitThread);
}
}
diff --git a/hbase-server/src/main/resources/hbase-compactions.xml b/hbase-server/src/main/resources/hbase-compactions.xml
new file mode 100644
index 00000000000..2d06e29dd64
--- /dev/null
+++ b/hbase-server/src/main/resources/hbase-compactions.xml
@@ -0,0 +1,160 @@
+
+
+
+
+
+ hbase.hstore.compactionThreshold
+ 3
+
+ If more than this number of HStoreFiles in any one HStore
+ (one HStoreFile is written per flush of memstore) then a compaction
+ is run to rewrite all HStoreFiles files as one. Larger numbers
+ put off compaction but when it runs, it takes longer to complete.
+ During a compaction, updates cannot be flushed to disk. Long
+ compactions require memory sufficient to carry the logging of
+ all updates across the duration of the compaction.
+
+ If too large, clients timeout during compaction.
+
+
+
+ hbase.hstore.compaction.max
+ 10
+ Max number of HStoreFiles to compact per 'minor' compaction.
+
+
+
+ hbase.hregion.majorcompaction
+ 86400000
+ The time (in miliseconds) between 'major' compactions of all
+ HStoreFiles in a region. Default: 1 day.
+ Set to 0 to disable automated major compactions.
+
+
+
+
+ hbase.hstore.compaction.CompactionPolicy
+ TierBasedCompactionPolicy
+ The compaction policy which should be used
+
+
+
+ hbase.hstore.compaction.tbl.cluster_test.cf.test_cf.NumCompactionTiers
+ 4
+ The number of tiers into which the files are assigned
+
+
+
+
+ hbase.hstore.compaction.Default.Tier.0.MaxAgeInDisk
+ 3600000
+ Length of time for which flush files are in 1st tier
+ value one hour.
+
+
+
+ hbase.hstore.compaction.tbl.cluster_test.cf.test_cf.Tier.1.MaxAgeInDisk
+ 10800000
+ Maximum age of a file to be in second tier
+ value 3 hours.
+
+
+
+ hbase.hstore.compaction.Default.Tier.2.MaxAgeInDisk
+ 36000000
+ Maximum age of a file to be in third tier
+ value 10 hours
+
+
+
+
+ hbase.hstore.compaction.Default.CompactionRatio
+ 0.0
+ The default compaction ratio used if unspecified.
+ value 0.
+
+
+
+ hbase.hstore.compaction.Default.Tier.1.CompactionRatio
+ 1.0
+ The compaction ratio for the second tier.
+ value 1.5.
+
+
+
+ hbase.hstore.compaction.Default.Tier.2.CompactionRatio
+ 0.75
+ The compaction ratio for the third tier.
+ value 0.75.
+
+
+
+ hbase.hstore.compaction.Default.Tier.3.CompactionRatio
+ 0.2
+ The compaction ratio for the fourth tier.
+ value 0.2.
+
+
+
+
+ hbase.hstore.compaction.min
+ 2
+ Default minimum number of files to compact
+ value 2.
+
+
+
+ hbase.hstore.compaction.tbl.cluster_test.cf.MinFilesToCompact
+ 3
+ Overridden Default minimum number of files to compact
+ value 3.
+
+
+
+ hbase.hstore.compaction.max
+ 7
+ Default maximum number of files to compact
+ value 7.
+
+
+
+ hbase.hstore.compaction.Default.Tier.1.MinFilesToCompact
+ 2
+ minimum number of files to compact in second tier
+ value 2.
+
+
+
+ hbase.hstore.compaction.Default.Tier.3.MaxFilesToCompact
+ 6
+ maximum number of files to compact in fourth tier
+ value 6.
+
+
+
+
+ hbase.hstore.compaction.Default.Tier.2.EndInclusionTier
+ 1
+ The minimum tier whose files go together with this tier
+ value 1.
+
+
+
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
index 0fd274a5255..edd6085e412 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
@@ -303,6 +303,8 @@ public class TestCompaction extends HBaseTestCase {
conf.setFloat("hbase.hregion.majorcompaction.jitter", jitterPct);
HStore s = ((HStore) r.getStore(COLUMN_FAMILY));
+ // TODO: temporary call, until HBASE-3909 is committed in some form.
+ s.updateConfiguration();
try {
createStoreFile(r);
createStoreFile(r);
@@ -314,9 +316,10 @@ public class TestCompaction extends HBaseTestCase {
assertEquals(2, s.getStorefilesCount());
// ensure that major compaction time is deterministic
- long mcTime = s.getNextMajorCompactTime();
+ CompactionManager c = s.compactionManager;
+ long mcTime = c.getNextMajorCompactTime();
for (int i = 0; i < 10; ++i) {
- assertEquals(mcTime, s.getNextMajorCompactTime());
+ assertEquals(mcTime, c.getNextMajorCompactTime());
}
// ensure that the major compaction time is within the variance
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSelection.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java
similarity index 74%
rename from hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSelection.java
rename to hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java
index 4a26f21f5bb..498959236d9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSelection.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java
@@ -1,5 +1,4 @@
/**
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -26,6 +25,7 @@ import java.util.GregorianCalendar;
import java.util.List;
import junit.framework.TestCase;
+import org.junit.experimental.categories.Category;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -39,26 +39,27 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import com.google.common.collect.Lists;
-import org.junit.experimental.categories.Category;
@Category(SmallTests.class)
-public class TestCompactSelection extends TestCase {
- private final static Log LOG = LogFactory.getLog(TestCompactSelection.class);
+public class TestDefaultCompactSelection extends TestCase {
+ private final static Log LOG = LogFactory.getLog(TestDefaultCompactSelection.class);
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
- private Configuration conf;
- private HStore store;
+ protected Configuration conf;
+ protected HStore store;
private static final String DIR=
TEST_UTIL.getDataTestDir("TestCompactSelection").toString();
private static Path TEST_FILE;
+ private CompactionManager manager;
- private static final int minFiles = 3;
- private static final int maxFiles = 5;
+ protected static final int minFiles = 3;
+ protected static final int maxFiles = 5;
- private static final long minSize = 10;
- private static final long maxSize = 1000;
+ protected static final long minSize = 10;
+ protected static final long maxSize = 1000;
@Override
@@ -94,6 +95,8 @@ public class TestCompactSelection extends TestCase {
region = new HRegion(tableDir, hlog, fs, conf, info, htd, null);
store = new HStore(basedir, region, hcd, fs, conf);
+ manager = store.compactionManager;
+
TEST_FILE = StoreFile.getRandomFilename(fs, store.getHomedir());
fs.create(TEST_FILE);
}
@@ -102,20 +105,41 @@ public class TestCompactSelection extends TestCase {
static class MockStoreFile extends StoreFile {
long length = 0;
boolean isRef = false;
+ long ageInDisk;
+ long sequenceid;
- MockStoreFile(long length, boolean isRef) throws IOException {
- super(TEST_UTIL.getTestFileSystem(), TEST_FILE,
- TEST_UTIL.getConfiguration(),
+ MockStoreFile(long length, long ageInDisk, boolean isRef, long sequenceid) throws IOException {
+ super(TEST_UTIL.getTestFileSystem(), TEST_FILE, TEST_UTIL.getConfiguration(),
new CacheConfig(TEST_UTIL.getConfiguration()), BloomType.NONE,
NoOpDataBlockEncoder.INSTANCE);
this.length = length;
- this.isRef = isRef;
+ this.isRef = isRef;
+ this.ageInDisk = ageInDisk;
+ this.sequenceid = sequenceid;
}
void setLength(long newLen) {
this.length = newLen;
}
+ @Override
+ public boolean hasMinFlushTime() {
+ return ageInDisk != 0;
+ }
+
+ @Override
+ public long getMinFlushTime() {
+ if (ageInDisk < 0) {
+ return ageInDisk;
+ }
+ return EnvironmentEdgeManager.currentTimeMillis() - ageInDisk;
+ }
+
+ @Override
+ public long getMaxSequenceId() {
+ return sequenceid;
+ }
+
@Override
boolean isMajorCompaction() {
return false;
@@ -138,43 +162,70 @@ public class TestCompactSelection extends TestCase {
}
}
- List sfCreate(long ... sizes) throws IOException {
- return sfCreate(false, sizes);
+ ArrayList toArrayList(long... numbers) {
+ ArrayList result = new ArrayList();
+ for (long i : numbers) {
+ result.add(i);
+ }
+ return result;
}
- List sfCreate(boolean isReference, long ... sizes)
- throws IOException {
+ List sfCreate(long... sizes) throws IOException {
+ ArrayList ageInDisk = new ArrayList();
+ for (int i = 0; i < sizes.length; i++) {
+ ageInDisk.add(0L);
+ }
+ return sfCreate(toArrayList(sizes), ageInDisk);
+ }
+
+ List sfCreate(ArrayList sizes, ArrayList ageInDisk)
+ throws IOException {
+ return sfCreate(false, sizes, ageInDisk);
+ }
+
+ List sfCreate(boolean isReference, long... sizes) throws IOException {
+ ArrayList ageInDisk = new ArrayList(sizes.length);
+ for (int i = 0; i < sizes.length; i++) {
+ ageInDisk.add(0L);
+ }
+ return sfCreate(isReference, toArrayList(sizes), ageInDisk);
+ }
+
+ List sfCreate(boolean isReference, ArrayList sizes, ArrayList ageInDisk)
+ throws IOException {
List ret = Lists.newArrayList();
- for (long i : sizes) {
- ret.add(new MockStoreFile(i, isReference));
+ for (int i = 0; i < sizes.size(); i++) {
+ ret.add(new MockStoreFile(sizes.get(i), ageInDisk.get(i), isReference, i));
}
return ret;
}
long[] getSizes(List sfList) {
long[] aNums = new long[sfList.size()];
- for (int i=0; i candidates, long ... expected)
- throws IOException {
+
+ void compactEquals(List candidates, long... expected)
+ throws IOException {
compactEquals(candidates, false, expected);
}
- void compactEquals(List candidates, boolean forcemajor,
+ void compactEquals(List candidates, boolean forcemajor,
long ... expected)
throws IOException {
store.forceMajor = forcemajor;
- List actual = store.compactSelection(candidates).getFilesToCompact();
- store.forceMajor = false;
+ //Test Default compactions
+ List actual = store.compactionManager
+ .selectCompaction(candidates, Store.NO_PRIORITY, forcemajor).getFilesToCompact();
assertEquals(Arrays.toString(expected), Arrays.toString(getSizes(actual)));
+ store.forceMajor = false;
}
public void testCompactionRatio() throws IOException {
- /*
+ /**
* NOTE: these tests are specific to describe the implementation of the
* current compaction algorithm. Developed to ensure that refactoring
* doesn't implicitly alter this.
@@ -191,17 +242,15 @@ public class TestCompactSelection extends TestCase {
compactEquals(sfCreate(tooBig, tooBig, 700,700) /* empty */);
// small files = don't care about ratio
compactEquals(sfCreate(8,3,1), 8,3,1);
- /* TODO: add sorting + unit test back in when HBASE-2856 is fixed
- // sort first so you don't include huge file the tail end
+ /* TODO: add sorting + unit test back in when HBASE-2856 is fixed
+ // sort first so you don't include huge file the tail end.
// happens with HFileOutputFormat bulk migration
compactEquals(sfCreate(100,50,23,12,12, 500), 23, 12, 12);
*/
// don't exceed max file compact threshold
- assertEquals(maxFiles,
- store.compactSelection(sfCreate(7,6,5,4,3,2,1)).getFilesToCompact().size());
// note: file selection starts with largest to smallest.
compactEquals(sfCreate(7, 6, 5, 4, 3, 2, 1), 7, 6, 5, 4, 3);
-
+
/* MAJOR COMPACTION */
// if a major compaction has been forced, then compact everything
compactEquals(sfCreate(50,25,12,12), true, 50, 25, 12, 12);
@@ -211,15 +260,18 @@ public class TestCompactSelection extends TestCase {
compactEquals(sfCreate(tooBig, 12,12), true, tooBig, 12, 12);
// don't exceed max file compact threshold, even with major compaction
store.forceMajor = true;
+ assertEquals(maxFiles,
+ manager.selectCompaction(sfCreate(7, 6, 5, 4, 3, 2, 1), Store.NO_PRIORITY, false)
+ .getFilesToCompact().size());
compactEquals(sfCreate(7, 6, 5, 4, 3, 2, 1), 7, 6, 5, 4, 3);
store.forceMajor = false;
-
// if we exceed maxCompactSize, downgrade to minor
// if not, it creates a 'snowball effect' when files >> maxCompactSize:
// the last file in compaction is the aggregate of all previous compactions
compactEquals(sfCreate(100,50,23,12,12), true, 23, 12, 12);
conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 1);
conf.setFloat("hbase.hregion.majorcompaction.jitter", 0);
+ store.updateConfiguration();
try {
// trigger an aged major compaction
compactEquals(sfCreate(50,25,12,12), 50, 25, 12, 12);
@@ -236,15 +288,12 @@ public class TestCompactSelection extends TestCase {
// reference files shouldn't obey max threshold
compactEquals(sfCreate(true, tooBig, 12,12), tooBig, 12, 12);
// reference files should obey max file compact to avoid OOM
- assertEquals(maxFiles,
- store.compactSelection(sfCreate(true, 7,6,5,4,3,2,1)).getFilesToCompact().size());
- // reference compaction
- compactEquals(sfCreate(true, 7, 6, 5, 4, 3, 2, 1), 5, 4, 3, 2, 1);
-
+ compactEquals(sfCreate(true, 7, 6, 5, 4, 3, 2, 1), 7, 6, 5, 4, 3);
+
// empty case
compactEquals(new ArrayList() /* empty */);
// empty case (because all files are too big)
- compactEquals(sfCreate(tooBig, tooBig) /* empty */);
+ compactEquals(sfCreate(tooBig, tooBig) /* empty */);
}
public void testOffPeakCompactionRatio() throws IOException {
@@ -258,7 +307,7 @@ public class TestCompactSelection extends TestCase {
Calendar calendar = new GregorianCalendar();
int hourOfDay = calendar.get(Calendar.HOUR_OF_DAY);
LOG.debug("Hour of day = " + hourOfDay);
- int hourPlusOne = ((hourOfDay+1+24)%24);
+ int hourPlusOne = ((hourOfDay+1)%24);
int hourMinusOne = ((hourOfDay-1+24)%24);
int hourMinusTwo = ((hourOfDay-2+24)%24);
@@ -274,15 +323,16 @@ public class TestCompactSelection extends TestCase {
this.conf.setLong("hbase.offpeak.end.hour", hourPlusOne);
LOG.debug("Testing compact selection with off-peak settings (" +
hourMinusOne + ", " + hourPlusOne + ")");
- compactEquals(sfCreate(999,50,12,12, 1), 50, 12, 12, 1);
+ // update the compaction policy to include conf changes
+ store.setCompactionPolicy(CompactionManager.class.getName());
+ compactEquals(sfCreate(999, 50, 12, 12, 1), 50, 12, 12, 1);
// set peak hour outside current selection and check compact selection
this.conf.setLong("hbase.offpeak.start.hour", hourMinusTwo);
this.conf.setLong("hbase.offpeak.end.hour", hourMinusOne);
+ store.setCompactionPolicy(CompactionManager.class.getName());
LOG.debug("Testing compact selection with off-peak settings (" +
hourMinusTwo + ", " + hourMinusOne + ")");
compactEquals(sfCreate(999,50,12,12, 1), 12, 12, 1);
}
-
}
-
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
index f0f581ee00d..4c029878cb8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
@@ -216,17 +216,15 @@ public class TestStore extends TestCase {
flush(i);
}
// after flush; check the lowest time stamp
- long lowestTimeStampFromStore =
- HStore.getLowestTimestamp(store.getStorefiles());
- long lowestTimeStampFromFS =
- getLowestTimeStampFromFS(fs,store.getStorefiles());
- assertEquals(lowestTimeStampFromStore,lowestTimeStampFromFS);
-
+ long lowestTimeStampFromManager = CompactionManager.getLowestTimestamp(store.getStorefiles());
+ long lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
+ assertEquals(lowestTimeStampFromManager,lowestTimeStampFromFS);
+
// after compact; check the lowest time stamp
store.compact(store.requestCompaction());
- lowestTimeStampFromStore = HStore.getLowestTimestamp(store.getStorefiles());
- lowestTimeStampFromFS = getLowestTimeStampFromFS(fs,store.getStorefiles());
- assertEquals(lowestTimeStampFromStore,lowestTimeStampFromFS);
+ lowestTimeStampFromManager = CompactionManager.getLowestTimestamp(store.getStorefiles());
+ lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
+ assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS);
}
private static long getLowestTimeStampFromFS(FileSystem fs,
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTierCompactSelection.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTierCompactSelection.java
new file mode 100644
index 00000000000..37ad00eb778
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTierCompactSelection.java
@@ -0,0 +1,318 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.junit.experimental.categories.Category;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.SmallTests;
+
+@Category(SmallTests.class)
+public class TestTierCompactSelection extends TestDefaultCompactSelection {
+ private final static Log LOG = LogFactory.getLog(TestTierCompactSelection.class);
+
+ private static final int numTiers = 4;
+
+ private String strPrefix, strSchema, strTier;
+
+
+ @Override
+ public void setUp() throws Exception {
+
+ super.setUp();
+
+ // setup config values necessary for store
+ strPrefix = "hbase.hstore.compaction.";
+ strSchema = "tbl." + store.getHRegion().getTableDesc().getNameAsString()
+ + "cf." + store.getFamily().getNameAsString() + ".";
+
+ this.conf.setStrings(strPrefix + "CompactionPolicy", "TierBasedCompactionPolicy");
+
+ this.conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 0);
+
+ // The following parameters are for default compaction
+ // Some of them are used as default values of tier based compaction
+ this.conf.setInt(strPrefix + "min", 2);
+ this.conf.setInt(strPrefix + "max", 10);
+ this.conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 0);
+ this.conf.setLong(strPrefix + "max.size", 10000);
+ this.conf.setFloat(strPrefix + "ratio", 10.0F);
+
+ // Specifying the family parameters here
+ conf.setInt(strPrefix + strSchema + "NumCompactionTiers", numTiers);
+ conf.setLong(strPrefix + strSchema + "MinCompactSize", minSize);
+ conf.setLong(strPrefix + strSchema + "MaxCompactSize", maxSize);
+
+ // Specifying parameters for the default tier
+ strTier = "";
+ conf.setFloat(strPrefix + strSchema + strTier + "CompactionRatio", 0.1F);
+ conf.setInt(strPrefix + strSchema + strTier + "MinFilesToCompact", minFiles);
+ conf.setInt(strPrefix + strSchema + strTier + "MaxFilesToCompact", maxFiles);
+
+ // Specifying parameters for individual tiers here
+
+ // Don't compact in this tier (likely to be in block cache)
+ strTier = "Tier.0.";
+ conf.setFloat(strPrefix + strSchema + strTier + "CompactionRatio", 0.0F);
+
+ // Most aggressive tier
+ strTier = "Tier.1.";
+ conf.setFloat(strPrefix + strSchema + strTier + "CompactionRatio", 2.0F);
+ conf.setInt(strPrefix + strSchema + strTier + "MinFilesToCompact", 2);
+ conf.setInt(strPrefix + strSchema + strTier + "MaxFilesToCompact", 10);
+
+ // Medium tier
+ strTier = "Tier.2.";
+ conf.setFloat(strPrefix + strSchema + strTier + "CompactionRatio", 1.0F);
+ // Also include files in tier 1 here
+ conf.setInt(strPrefix + strSchema + strTier + "EndingIndexForTier", 1);
+
+ // Last tier - least aggressive compaction
+ // has default tier settings only
+ // Max Time elapsed is Infinity by default
+
+ }
+
+ @Override
+ void compactEquals(
+ List candidates, boolean forcemajor,
+ long... expected
+ )
+ throws IOException {
+ store.forceMajor = forcemajor;
+ //update the policy for now in case any change
+ store.setCompactionPolicy(TierCompactionManager.class.getName());
+ List actual =
+ store.compactionManager.selectCompaction(candidates, Store.NO_PRIORITY, forcemajor).getFilesToCompact();
+ store.forceMajor = false;
+ assertEquals(Arrays.toString(expected), Arrays.toString(getSizes(actual)));
+ }
+
+ public void testAgeBasedAssignment() throws IOException {
+
+ conf.setLong(strPrefix + strSchema + "Tier.0.MaxAgeInDisk", 10L);
+ conf.setLong(strPrefix + strSchema + "Tier.1.MaxAgeInDisk", 100L);
+ conf.setLong(strPrefix + strSchema + "Tier.2.MaxAgeInDisk", 1000L);
+ conf.setLong(strPrefix + strSchema + "Tier.0.MaxSize", Long.MAX_VALUE);
+ conf.setLong(strPrefix + strSchema + "Tier.1.MaxSize", Long.MAX_VALUE);
+ conf.setLong(strPrefix + strSchema + "Tier.2.MaxSize", Long.MAX_VALUE);
+
+ //everything in first tier, don't compact!
+ compactEquals(sfCreate(toArrayList(
+ 151, 30, 13, 12, 11 ), toArrayList( // Sizes
+ 8, 5, 4, 2, 1 )) // ageInDisk ( = currentTime - minFlushTime)
+ /* empty expected */ ); // Selected sizes
+
+ //below minSize should compact
+ compactEquals(sfCreate(toArrayList(
+ 12, 11, 8, 3, 1 ), toArrayList(
+ 8, 5, 4, 2, 1 )),
+ 8, 3, 1 );
+
+ //everything in second tier
+ compactEquals(sfCreate(toArrayList(
+ 251, 70, 13, 12, 11 ), toArrayList(
+ 80, 50, 40, 20, 11 )),
+ 70, 13, 12, 11 );
+
+ //everything in third tier
+ compactEquals(sfCreate(toArrayList(
+ 251, 70, 13, 12, 11 ), toArrayList(
+ 800, 500, 400, 200, 110 )),
+ 13, 12, 11 );
+
+ //everything in fourth tier
+ compactEquals(sfCreate(toArrayList(
+ 251, 70, 13, 12, 11 ), toArrayList(
+ 8000, 5000, 4000, 2000, 1100 ))
+ /* empty expected */ );
+
+ //Valid compaction in 4th tier with ratio 0.10, hits maxFilesToCompact
+ compactEquals(sfCreate(toArrayList(
+ 500, 93, 92, 91, 90, 89, 88, 87, 86, 85, 84, 83, 82, 81, 80 ), toArrayList(
+ 5094, 5093, 5092, 5091, 5090, 5089, 5088, 5087, 5086, 5085, 5084, 5083, 5082, 5081, 5080)),
+ 93, 92, 91, 90, 89 );
+
+ //Now mixing tiers 1,0, expected selection in tier 1 only
+ compactEquals(sfCreate(toArrayList(
+ 999, 110, 100, 12, 11 ), toArrayList(
+ 90, 80, 50, 4, 1 )),
+ 110, 100 );
+
+ //Mixing tier 2,1, expected selection in tier 2 including tier 1 but not zero
+ compactEquals(sfCreate(toArrayList(
+ 999, 110, 100, 12, 11 ), toArrayList(
+ 900, 800, 500, 40, 1 )),
+ 110, 100, 12 );
+
+ //Mixing tier 2,1, expected selection in tier 1 because of recentFirstOrder = true
+ compactEquals(sfCreate(toArrayList(
+ 999, 110, 100, 12, 13, 11 ), toArrayList(
+ 900, 800, 500, 40, 30, 1 )),
+ 12, 13 );
+
+ conf.setBoolean(strPrefix + strSchema + "IsRecentFirstOrder", false);
+
+ //Mixing tier 2,1, expected selection in tier 1 because of recentFirstOrder = false
+ compactEquals(sfCreate(toArrayList(
+ 999, 110, 100, 12, 13, 11 ), toArrayList(
+ 900, 800, 500, 40, 30, 1 )),
+ 110, 100, 12, 13 );
+
+ //Mixing all tier 3,2,1,0 expected selection in tier 1 only
+ compactEquals(sfCreate(toArrayList(
+ 999, 800, 110, 100, 12, 13, 11 ), toArrayList(
+ 9000, 800, 50, 40, 8, 3, 1 )),
+ 110, 100 );
+
+ //Checking backward compatibility, first 3 files don't have minFlushTime,
+ //all should go to tier 1, not tier 0
+ compactEquals(sfCreate(toArrayList(
+ 999, 800, 110, 100, 12, 13, 11 ), toArrayList(
+ 0, 0, 0, 40, 8, 3, 1 )),
+ 999, 800, 110, 100 );
+
+ //make sure too big files don't get compacted
+ compactEquals(sfCreate(toArrayList(
+ 1002, 1001, 999, 800, 700, 12, 13, 11 ), toArrayList(
+ 900, 80, 50, 40, 30, 20, 4, 2 )),
+ 999, 800, 700, 12 );
+
+ }
+
+ public void testSizeBasedAssignment() throws IOException {
+
+ conf.setLong(strPrefix + strSchema + "MinCompactSize", 3);
+
+ conf.setLong(strPrefix + strSchema + "Tier.0.MaxSize", 10L);
+ conf.setLong(strPrefix + strSchema + "Tier.1.MaxSize", 100L);
+ conf.setLong(strPrefix + strSchema + "Tier.2.MaxSize", 1000L);
+ conf.setLong(strPrefix + strSchema + "Tier.0.MaxAgeInDisk", Long.MAX_VALUE);
+ conf.setLong(strPrefix + strSchema + "Tier.1.MaxAgeInDisk", Long.MAX_VALUE);
+ conf.setLong(strPrefix + strSchema + "Tier.2.MaxAgeInDisk", Long.MAX_VALUE);
+
+ compactEquals(sfCreate(false,
+ 500, 3, 2, 1 ),
+ 3, 2, 1 );
+
+ compactEquals(sfCreate(false,
+ 500, 8, 7, 6, 5, 4, 2, 1 )
+ /* empty */ );
+
+ compactEquals(sfCreate(false,
+ 500, 6, 8, 4, 7, 4, 2, 1 )
+ /* empty */ );
+
+ compactEquals(sfCreate(false,
+ 500, 23, 11, 8, 4, 1 )
+ /* empty */ );
+
+ compactEquals(sfCreate(false,
+ 500, 11, 23, 8, 4, 1 ),
+ 11, 23 );
+
+ compactEquals(sfCreate(false,
+ 500, 9, 23, 8, 4, 1 ),
+ 9, 23 );
+
+ compactEquals(sfCreate(false,
+ 500, 70, 23, 11, 8, 4, 1 )
+ /* empty */ );
+
+ compactEquals(sfCreate(false,
+ 500, 60, 23, 11, 8, 4, 1 ),
+ 60, 23, 11 );
+
+ compactEquals(sfCreate(false,
+ 500, 90, 60, 23, 11, 8, 4, 1 ),
+ 90, 60, 23, 11 );
+
+ conf.setBoolean(strPrefix + strSchema + "IsRecentFirstOrder", false);
+
+ compactEquals(sfCreate(false,
+ 500, 450, 60, 23, 11, 8, 4, 1 ),
+ 500, 450, 60, 23, 11 );
+
+ compactEquals(sfCreate(false,
+ 450, 500, 60, 23, 11, 8, 4, 1 ),
+ 450, 500, 60, 23, 11 );
+
+ compactEquals(sfCreate(false,
+ 1013, 1012, 1011, 1010, 1009, 1008, 1007, 1006, 1005, 1004, 1003, 1002, 1001, 999, 450, 550 ),
+ 999, 450, 550 );
+
+ conf.setLong(strPrefix + strSchema + "MaxCompactSize", 10000);
+
+ compactEquals(sfCreate(false,
+ 1013, 1012, 1011, 1010, 1009, 1008, 1007, 1006, 1005, 1004, 1003, 1002, 1001, 999, 450, 550 ),
+ 1013, 1012, 1011, 1010, 1009 );
+
+ compactEquals(sfCreate(false,
+ 1013, 992, 1011, 1010, 1009, 1008, 1007, 1006, 1005, 1004, 1003, 1002, 1001, 999, 450, 550),
+ 1013, 992, 1011, 1010, 1009 );
+
+ compactEquals(sfCreate(false,
+ 992, 993, 1011, 990, 1009, 998, 1007, 996, 1005, 994, 1003, 992, 1001, 999, 450, 550 ),
+ 992, 993, 1011, 990, 1009 );
+
+ conf.setBoolean(strPrefix + strSchema + "IsRecentFirstOrder", true);
+
+ compactEquals(sfCreate(false,
+ 500, 450, 60, 23, 11, 8, 4, 1 ),
+ 60, 23, 11 );
+
+ compactEquals(sfCreate(false,
+ 450, 500, 60, 23, 11, 8, 4, 1 ),
+ 60, 23, 11 );
+
+ compactEquals(sfCreate(false,
+ 1013, 1012, 1011, 1010, 1009, 1008, 1007, 1006, 1005, 1004, 1003, 1002, 1001, 999, 450, 550 ),
+ 999, 450, 550 );
+
+ compactEquals(sfCreate(false,
+ 992, 993, 1011, 990, 1009, 998, 1007, 996, 1005, 994, 1003, 992, 1001, 999, 450, 550 ),
+ 999, 450, 550 );
+
+ compactEquals(sfCreate(false,
+ 992, 993, 1011, 990, 1009, 998, 1007, 996, 1005, 994, 1003, 992, 991, 999, 450, 550 ),
+ 992, 991, 999, 450, 550 );
+
+ compactEquals(sfCreate(false,
+ 992, 993, 1011, 990, 1009, 998, 1007, 996, 1005, 994, 1003, 992, 991, 999, 450, 550, 1001),
+ 992, 993, 1011, 990, 1009 );
+
+ }
+
+ @Override
+ public void testCompactionRatio() throws IOException {
+ conf.setInt(strPrefix + strSchema + "NumCompactionTiers", 1);
+ conf.setFloat(strPrefix + strSchema + "Tier.0.CompactionRatio", 1.0F);
+ conf.setInt(strPrefix + "max", 5);
+ super.testCompactionRatio();
+ }
+
+ @Override
+ public void testOffPeakCompactionRatio() throws IOException {}
+
+}