From 1abf20084a3c41fc83953fb470940fae4a4644cf Mon Sep 17 00:00:00 2001 From: Zhihong Yu Date: Tue, 30 Oct 2012 22:04:41 +0000 Subject: [PATCH] HBASE-7055 port HBASE-6371 tier-based compaction from 0.89-fb to trunk - revert for further discussion git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1403890 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop/hbase/HBaseConfiguration.java | 1 - .../hbase/mapreduce/HFileOutputFormat.java | 2 - .../regionserver/CompactionConfiguration.java | 203 --------- .../hbase/regionserver/CompactionManager.java | 411 ----------------- .../hadoop/hbase/regionserver/Compactor.java | 11 +- .../hadoop/hbase/regionserver/HRegion.java | 2 +- .../hadoop/hbase/regionserver/HStore.java | 416 ++++++++++++++---- .../hadoop/hbase/regionserver/Store.java | 19 - .../hadoop/hbase/regionserver/StoreFile.java | 30 +- .../TierCompactionConfiguration.java | 267 ----------- .../regionserver/TierCompactionManager.java | 256 ----------- .../compactions/CompactSelection.java | 105 +++-- .../compactions/CompactionRequest.java | 6 +- .../src/main/resources/hbase-compactions.xml | 160 ------- ...lection.java => TestCompactSelection.java} | 138 ++---- .../TestTierCompactSelection.java | 318 ------------- 16 files changed, 445 insertions(+), 1900 deletions(-) delete mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionConfiguration.java delete mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionManager.java delete mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TierCompactionConfiguration.java delete mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TierCompactionManager.java delete mode 100644 hbase-server/src/main/resources/hbase-compactions.xml rename hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/{TestDefaultCompactSelection.java => TestCompactSelection.java} (74%) delete mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTierCompactSelection.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java index 20ce0d0e0e6..95defd3a00c 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java @@ -98,7 +98,6 @@ public class HBaseConfiguration extends Configuration { public static Configuration addHbaseResources(Configuration conf) { conf.addResource("hbase-default.xml"); conf.addResource("hbase-site.xml"); - conf.addResource("hbase-compactions.xml"); checkDefaultsVersion(conf); checkForClusterFreeMemoryLimit(conf); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java index 9d32edf5891..860d9093925 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java @@ -223,8 +223,6 @@ public class HFileOutputFormat extends FileOutputFormat - * maxCompactSize - upper bound on file size to be included in minor compactions - * minCompactSize - lower bound below which compaction is selected without ratio test - * minFilesToCompact - lower bound on number of files in any minor compaction - * maxFilesToCompact - upper bound on number of files in any minor compaction - * compactionRatio - Ratio used for compaction - *

- * Set parameter as "hbase.hstore.compaction." - */ - -//TODO: revisit this class for online parameter updating - -public class CompactionConfiguration { - - static final Log LOG = LogFactory.getLog(CompactionConfiguration.class); - - Configuration conf; - Store store; - - long maxCompactSize; - long minCompactSize; - int minFilesToCompact; - int maxFilesToCompact; - double compactionRatio; - double offPeekCompactionRatio; - int offPeakStartHour; - int offPeakEndHour; - long throttlePoint; - boolean shouldDeleteExpired; - long majorCompactionPeriod; - float majorCompactionJitter; - - CompactionConfiguration(Configuration conf, Store store) { - this.conf = conf; - this.store = store; - - String strPrefix = "hbase.hstore.compaction."; - - maxCompactSize = conf.getLong(strPrefix + "max.size", Long.MAX_VALUE); - minCompactSize = conf.getLong(strPrefix + "min.size", store.getHRegion().memstoreFlushSize); - minFilesToCompact = Math.max(2, conf.getInt(strPrefix + "min", - /*old name*/ conf.getInt("hbase.hstore.compactionThreshold", 3))); - maxFilesToCompact = conf.getInt(strPrefix + "max", 10); - compactionRatio = conf.getFloat(strPrefix + "ratio", 1.2F); - offPeekCompactionRatio = conf.getFloat(strPrefix + "ratio.offpeak", 5.0F); - offPeakStartHour = conf.getInt("hbase.offpeak.start.hour", -1); - offPeakEndHour = conf.getInt("hbase.offpeak.end.hour", -1); - - if (!isValidHour(offPeakStartHour) || !isValidHour(offPeakEndHour)) { - if (!(offPeakStartHour == -1 && offPeakEndHour == -1)) { - LOG.warn("Ignoring invalid start/end hour for peak hour : start = " + - this.offPeakStartHour + " end = " + this.offPeakEndHour + - ". Valid numbers are [0-23]"); - } - this.offPeakStartHour = this.offPeakEndHour = -1; - } - - throttlePoint = conf.getLong("hbase.regionserver.thread.compaction.throttle", - 2 * maxFilesToCompact * store.getHRegion().memstoreFlushSize); - shouldDeleteExpired = conf.getBoolean("hbase.store.delete.expired.storefile", true); - majorCompactionPeriod = conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24); - majorCompactionJitter = conf.getFloat("hbase.hregion.majorcompaction.jitter", 0.20F); - - LOG.info("Compaction configuration " + this.toString()); - } - - @Override - public String toString() { - return String.format( - "size [%d, %d); files [%d, %d); ratio %f; off-peak ratio %f; off-peak hours %d-%d; " + - "throttle point %d;%s delete expired; major period %d, major jitter %f", - minCompactSize, - maxCompactSize, - minFilesToCompact, - maxFilesToCompact, - compactionRatio, - offPeekCompactionRatio, - offPeakStartHour, - offPeakEndHour, - throttlePoint, - shouldDeleteExpired ? "" : " don't", - majorCompactionPeriod, - majorCompactionJitter); - } - - /** - * @return lower bound below which compaction is selected without ratio test - */ - long getMinCompactSize() { - return minCompactSize; - } - - /** - * @return upper bound on file size to be included in minor compactions - */ - long getMaxCompactSize() { - return maxCompactSize; - } - - /** - * @return upper bound on number of files to be included in minor compactions - */ - int getMinFilesToCompact() { - return minFilesToCompact; - } - - /** - * @return upper bound on number of files to be included in minor compactions - */ - int getMaxFilesToCompact() { - return maxFilesToCompact; - } - - /** - * @return Ratio used for compaction - */ - double getCompactionRatio() { - return compactionRatio; - } - - /** - * @return Off peak Ratio used for compaction - */ - double getCompactionRatioOffPeak() { - return offPeekCompactionRatio; - } - - /** - * @return Hour at which off-peak compactions start - */ - int getOffPeakStartHour() { - return offPeakStartHour; - } - - /** - * @return Hour at which off-peak compactions end - */ - int getOffPeakEndHour() { - return offPeakEndHour; - } - - /** - * @return ThrottlePoint used for classifying small and large compactions - */ - long getThrottlePoint() { - return throttlePoint; - } - - /** - * @return Major compaction period from compaction. - * Major compactions are selected periodically according to this parameter plus jitter - */ - long getMajorCompactionPeriod() { - return majorCompactionPeriod; - } - - /** - * @return Major the jitter fraction, the fraction within which the major compaction period is - * randomly chosen from the majorCompactionPeriod in each store. - */ - float getMajorCompactionJitter() { - return majorCompactionJitter; - } - - /** - * @return Whether expired files should be deleted ASAP using compactions - */ - boolean shouldDeleteExpired() { - return shouldDeleteExpired; - } - - private static boolean isValidHour(int hour) { - return (hour >= 0 && hour <= 23); - } -} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionManager.java deleted file mode 100644 index 716b27ea77a..00000000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionManager.java +++ /dev/null @@ -1,411 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.regionserver; - -import com.google.common.base.Predicate; -import com.google.common.collect.Collections2; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.util.StringUtils; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Calendar; -import java.util.GregorianCalendar; -import java.util.List; -import java.util.Random; - -@InterfaceAudience.Private -public class CompactionManager { - - private static final Log LOG = LogFactory.getLog(CompactionManager.class); - private final static Calendar calendar = new GregorianCalendar(); - - private Store store; - CompactionConfiguration comConf; - - CompactionManager(Configuration configuration, Store store) { - this.store = store; - comConf = new CompactionConfiguration(configuration, store); - } - - /** - * @param candidateFiles candidate files, ordered from oldest to newest - * @return subset copy of candidate list that meets compaction criteria - * @throws java.io.IOException - */ - CompactSelection selectCompaction(List candidateFiles, int priority, boolean forceMajor) - throws IOException { - // Prelimanry compaction subject to filters - CompactSelection candidateSelection = new CompactSelection(candidateFiles); - - if (!forceMajor) { - // If there are expired files, only select them so that compaction deletes them - if (comConf.shouldDeleteExpired() && (store.getTtl() != Long.MAX_VALUE)) { - CompactSelection expiredSelection = selectExpiredSFs( - candidateSelection, EnvironmentEdgeManager.currentTimeMillis() - store.getTtl()); - if (expiredSelection != null) { - return expiredSelection; - } - } - candidateSelection = skipLargeFiles(candidateSelection); - } - - // Force a major compaction if this is a user-requested major compaction, - // or if we do not have too many files to compact and this was requested - // as a major compaction. - // Or, if there are any references among the candidates. - boolean isUserCompaction = (priority == Store.PRIORITY_USER); - boolean majorCompaction = ( - (forceMajor && isUserCompaction) - || ((forceMajor || isMajorCompaction(candidateSelection.getFilesToCompact())) - && (candidateSelection.getFilesToCompact().size() < comConf.getMaxFilesToCompact())) - || store.hasReferences(candidateSelection.getFilesToCompact()) - ); - - LOG.debug(store.getHRegion().regionInfo.getEncodedName() + " - " + - store.getColumnFamilyName() + ": Initiating " + - (majorCompaction ? "major" : "minor") + "compaction"); - - if (!majorCompaction) { - // we're doing a minor compaction, let's see what files are applicable - candidateSelection = filterBulk(candidateSelection); - candidateSelection = applyCompactionPolicy(candidateSelection); - candidateSelection = checkMinFilesCriteria(candidateSelection); - } - candidateSelection = removeExcessFiles(candidateSelection, isUserCompaction, majorCompaction); - return candidateSelection; - } - - /** - * Select the expired store files to compact - * - * @param candidates the initial set of storeFiles - * @param maxExpiredTimeStamp - * The store file will be marked as expired if its max time stamp is - * less than this maxExpiredTimeStamp. - * @return A CompactSelection contains the expired store files as - * filesToCompact - */ - private CompactSelection selectExpiredSFs - (CompactSelection candidates, long maxExpiredTimeStamp) { - List filesToCompact = candidates.getFilesToCompact(); - if (filesToCompact == null || filesToCompact.size() == 0) - return null; - ArrayList expiredStoreFiles = null; - boolean hasExpiredStoreFiles = false; - CompactSelection expiredSFSelection = null; - - for (StoreFile storeFile : filesToCompact) { - if (storeFile.getReader().getMaxTimestamp() < maxExpiredTimeStamp) { - LOG.info("Deleting the expired store file by compaction: " - + storeFile.getPath() + " whose maxTimeStamp is " - + storeFile.getReader().getMaxTimestamp() - + " while the max expired timestamp is " + maxExpiredTimeStamp); - if (!hasExpiredStoreFiles) { - expiredStoreFiles = new ArrayList(); - hasExpiredStoreFiles = true; - } - expiredStoreFiles.add(storeFile); - } - } - - if (hasExpiredStoreFiles) { - expiredSFSelection = new CompactSelection(expiredStoreFiles); - } - return expiredSFSelection; - } - - /** - * @param candidates pre-filtrate - * @return filtered subset - * exclude all files above maxCompactSize - * Also save all references. We MUST compact them - */ - private CompactSelection skipLargeFiles(CompactSelection candidates) { - int pos = 0; - while (pos < candidates.getFilesToCompact().size() && - candidates.getFilesToCompact().get(pos).getReader().length() > - comConf.getMaxCompactSize() && - !candidates.getFilesToCompact().get(pos).isReference()) { - ++pos; - } - if (pos > 0) { - LOG.debug("Some files are too large. Excluding " + pos + " files from compaction candidates"); - candidates.clearSubList(0, pos); - } - return candidates; - } - - /** - * @param candidates pre-filtrate - * @return filtered subset - * exclude all bulk load files if configured - */ - private CompactSelection filterBulk(CompactSelection candidates) { - candidates.getFilesToCompact().removeAll(Collections2.filter( - candidates.getFilesToCompact(), - new Predicate() { - @Override - public boolean apply(StoreFile input) { - return input.excludeFromMinorCompaction(); - } - })); - return candidates; - } - - /** - * @param candidates pre-filtrate - * @return filtered subset - * take upto maxFilesToCompact from the start - */ - private CompactSelection removeExcessFiles(CompactSelection candidates, - boolean isUserCompaction, boolean isMajorCompaction) { - int excess = candidates.getFilesToCompact().size() - comConf.getMaxFilesToCompact(); - if (excess > 0) { - if (isMajorCompaction && isUserCompaction) { - LOG.debug("Warning, compacting more than " + comConf.getMaxFilesToCompact() + - " files because of a user-requested major compaction"); - } else { - LOG.debug("Too many admissible files. Excluding " + excess - + " files from compaction candidates"); - candidates.clearSubList(comConf.getMaxFilesToCompact(), - candidates.getFilesToCompact().size()); - } - } - return candidates; - } - /** - * @param candidates pre-filtrate - * @return filtered subset - * forget the compactionSelection if we don't have enough files - */ - private CompactSelection checkMinFilesCriteria(CompactSelection candidates) { - int minFiles = comConf.getMinFilesToCompact(); - if (candidates.getFilesToCompact().size() < minFiles) { - if(LOG.isDebugEnabled()) { - LOG.debug("Not compacting files because we only have " + - candidates.getFilesToCompact().size() + - " files ready for compaction. Need " + minFiles + " to initiate."); - } - candidates.emptyFileList(); - } - return candidates; - } - - /** - * @param candidates pre-filtrate - * @return filtered subset - * -- Default minor compaction selection algorithm: Choose CompactSelection from candidates -- - * First exclude bulk-load files if indicated in configuration. - * Start at the oldest file and stop when you find the first file that - * meets compaction criteria: - * (1) a recently-flushed, small file (i.e. <= minCompactSize) - * OR - * (2) within the compactRatio of sum(newer_files) - * Given normal skew, any newer files will also meet this criteria - *

- * Additional Note: - * If fileSizes.size() >> maxFilesToCompact, we will recurse on - * compact(). Consider the oldest files first to avoid a - * situation where we always compact [end-threshold,end). Then, the - * last file becomes an aggregate of the previous compactions. - * - * normal skew: - * - * older ----> newer (increasing seqID) - * _ - * | | _ - * | | | | _ - * --|-|- |-|- |-|---_-------_------- minCompactSize - * | | | | | | | | _ | | - * | | | | | | | | | | | | - * | | | | | | | | | | | | - */ - CompactSelection applyCompactionPolicy(CompactSelection candidates) throws IOException { - if (candidates.getFilesToCompact().isEmpty()) { - return candidates; - } - - // we're doing a minor compaction, let's see what files are applicable - int start = 0; - double ratio = comConf.getCompactionRatio(); - if (isOffPeakHour() && candidates.trySetOffpeak()) { - ratio = comConf.getCompactionRatioOffPeak(); - LOG.info("Running an off-peak compaction, selection ratio = " + ratio - + ", numOutstandingOffPeakCompactions is now " - + CompactSelection.getNumOutStandingOffPeakCompactions()); - } - - // get store file sizes for incremental compacting selection. - int countOfFiles = candidates.getFilesToCompact().size(); - long[] fileSizes = new long[countOfFiles]; - long[] sumSize = new long[countOfFiles]; - for (int i = countOfFiles - 1; i >= 0; --i) { - StoreFile file = candidates.getFilesToCompact().get(i); - fileSizes[i] = file.getReader().length(); - // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo - int tooFar = i + comConf.getMaxFilesToCompact() - 1; - sumSize[i] = fileSizes[i] - + ((i + 1 < countOfFiles) ? sumSize[i + 1] : 0) - - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0); - } - - - while (countOfFiles - start >= comConf.getMinFilesToCompact() && - fileSizes[start] > Math.max(comConf.getMinCompactSize(), (long) (sumSize[start + 1] * ratio))) { - ++start; - } - if (start < countOfFiles) { - LOG.info("Default compaction algorithm has selected " + (countOfFiles - start) - + " files from " + countOfFiles + " candidates"); - } - - candidates = candidates.getSubList(start, countOfFiles); - - return candidates; - } - - /* - * @param filesToCompact Files to compact. Can be null. - * @return True if we should run a major compaction. - */ - boolean isMajorCompaction(final List filesToCompact) throws IOException { - boolean result = false; - long mcTime = getNextMajorCompactTime(); - if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) { - return result; - } - // TODO: Use better method for determining stamp of last major (HBASE-2990) - long lowTimestamp = getLowestTimestamp(filesToCompact); - long now = System.currentTimeMillis(); - if (lowTimestamp > 0l && lowTimestamp < (now - mcTime)) { - // Major compaction time has elapsed. - if (filesToCompact.size() == 1) { - // Single file - StoreFile sf = filesToCompact.get(0); - long oldest = - (sf.getReader().timeRangeTracker == null) ? - Long.MIN_VALUE : - now - sf.getReader().timeRangeTracker.minimumTimestamp; - if (sf.isMajorCompaction() && - (store.getTtl() == HConstants.FOREVER || oldest < store.getTtl())) { - if (LOG.isDebugEnabled()) { - LOG.debug("Skipping major compaction of " + this + - " because one (major) compacted file only and oldestTime " + - oldest + "ms is < ttl=" + store.getTtl()); - } - } else if (store.getTtl() != HConstants.FOREVER && oldest > store.getTtl()) { - LOG.debug("Major compaction triggered on store " + this + - ", because keyvalues outdated; time since last major compaction " + - (now - lowTimestamp) + "ms"); - result = true; - } - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Major compaction triggered on store " + this + - "; time since last major compaction " + (now - lowTimestamp) + "ms"); - } - result = true; - } - } - return result; - } - - long getNextMajorCompactTime() { - // default = 24hrs - long ret = comConf.getMajorCompactionPeriod(); - String strCompactionTime = store.getFamily().getValue(HConstants.MAJOR_COMPACTION_PERIOD); - if (strCompactionTime != null) { - ret = (new Long(strCompactionTime)).longValue(); - } - - if (ret > 0) { - // default = 20% = +/- 4.8 hrs - double jitterPct = comConf.getMajorCompactionJitter(); - if (jitterPct > 0) { - long jitter = Math.round(ret * jitterPct); - // deterministic jitter avoids a major compaction storm on restart - Integer seed = store.getDeterministicRandomSeed(); - if (seed != null) { - double rnd = (new Random(seed)).nextDouble(); - ret += jitter - Math.round(2L * jitter * rnd); - } else { - ret = 0; // no storefiles == no major compaction - } - } - } - return ret; - } - - /* - * Gets lowest timestamp from candidate StoreFiles - * - * @param fs - * @param dir - * @throws IOException - */ - static long getLowestTimestamp(final List candidates) - throws IOException { - long minTs = Long.MAX_VALUE; - for (StoreFile storeFile : candidates) { - minTs = Math.min(minTs, storeFile.getModificationTimeStamp()); - } - return minTs; - } - - /** - * @param compactionSize Total size of some compaction - * @return whether this should be a large or small compaction - */ - boolean throttleCompaction(long compactionSize) { - return compactionSize > comConf.getThrottlePoint(); - } - - /** - * @param numCandidates Number of candidate store files - * @return whether a compactionSelection is possible - */ - boolean needsCompaction(int numCandidates) { - return numCandidates > comConf.getMinFilesToCompact(); - } - - /** - * @return whether this is off-peak hour - */ - private boolean isOffPeakHour() { - int currentHour = calendar.get(Calendar.HOUR_OF_DAY); - int startHour = comConf.getOffPeakStartHour(); - int endHour = comConf.getOffPeakEndHour(); - // If offpeak time checking is disabled just return false. - if (startHour == endHour) { - return false; - } - if (startHour < endHour) { - return (currentHour >= startHour && currentHour < endHour); - } - return (currentHour >= startHour || currentHour < endHour); - } -} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java index c0748cd9ed9..a526345e18c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java @@ -34,7 +34,6 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.hfile.Compression; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.util.StringUtils; /** @@ -65,15 +64,11 @@ class Compactor extends Configured { final Collection filesToCompact, final boolean majorCompaction, final long maxId) throws IOException { - // Calculate maximum key count after compaction (for blooms), and minFlushTime after compaction + // Calculate maximum key count after compaction (for blooms) // Also calculate earliest put timestamp if major compaction int maxKeyCount = 0; - long minFlushTime = Long.MAX_VALUE; long earliestPutTs = HConstants.LATEST_TIMESTAMP; for (StoreFile file: filesToCompact) { - if (file.hasMinFlushTime() && file.getMinFlushTime() < minFlushTime) { - minFlushTime = file.getMinFlushTime(); - } StoreFile.Reader r = file.getReader(); if (r == null) { LOG.warn("Null reader for " + file.getPath()); @@ -199,10 +194,6 @@ class Compactor extends Configured { } } finally { if (writer != null) { - if (minFlushTime == Long.MAX_VALUE) { - minFlushTime = StoreFile.NO_MIN_FLUSH_TIME; - } - writer.appendFileInfo(StoreFile.MIN_FLUSH_TIME, Bytes.toBytes(minFlushTime)); writer.appendMetadata(maxId, majorCompaction); writer.close(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 529865d66a1..a91fc490fdd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -4176,7 +4176,7 @@ public class HRegion implements HeapSize { // , Writable{ } /** - * @return True if needs a major compaction. + * @return True if needs a mojor compaction. * @throws IOException */ boolean isMajorCompaction() throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 2000d318870..32d83f95c5a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; -import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -64,10 +63,9 @@ import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.io.hfile.InvalidHFileException; import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder; import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; -import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection; -import org.apache.hadoop.hbase.regionserver.CompactionManager; import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured; import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics; import org.apache.hadoop.hbase.util.Bytes; @@ -79,6 +77,8 @@ import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.util.StringUtils; import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.collect.Collections2; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; @@ -108,24 +108,21 @@ import com.google.common.collect.Lists; @InterfaceAudience.Private public class HStore extends SchemaConfigured implements Store { static final Log LOG = LogFactory.getLog(HStore.class); - - /** Parameter name for what compaction manager to use. */ - private static final String COMPACTION_MANAGER_CLASS = "hbase.compactionmanager.class"; - - /** Default compaction manager class name. */ - private static final String DEFAULT_COMPACTION_MANAGER_CLASS = CompactionManager.class.getName(); protected final MemStore memstore; // This stores directory in the filesystem. private final Path homedir; private final HRegion region; private final HColumnDescriptor family; - CompactionManager compactionManager; final FileSystem fs; final Configuration conf; final CacheConfig cacheConf; - // ttl in milliseconds. TODO: can this be removed? Already stored in scanInfo. + // ttl in milliseconds. private long ttl; + private final int minFilesToCompact; + private final int maxFilesToCompact; + private final long minCompactSize; + private final long maxCompactSize; private long lastCompactSize = 0; volatile boolean forceMajor = false; /* how many bytes to write between status checks */ @@ -200,7 +197,7 @@ public class HStore extends SchemaConfigured implements Store { this.comparator = info.getComparator(); // Get TTL - this.ttl = determineTTLFromFamily(family); + this.ttl = getTTL(family); // used by ScanQueryMatcher long timeToPurgeDeletes = Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0); @@ -211,11 +208,23 @@ public class HStore extends SchemaConfigured implements Store { scanInfo = new ScanInfo(family, ttl, timeToPurgeDeletes, this.comparator); this.memstore = new MemStore(conf, this.comparator); + // By default, compact if storefile.count >= minFilesToCompact + this.minFilesToCompact = Math.max(2, + conf.getInt("hbase.hstore.compaction.min", + /*old name*/ conf.getInt("hbase.hstore.compactionThreshold", 3))); + LOG.info("hbase.hstore.compaction.min = " + this.minFilesToCompact); + // Setting up cache configuration for this family this.cacheConf = new CacheConfig(conf, family); this.blockingStoreFileCount = conf.getInt("hbase.hstore.blockingStoreFiles", 7); + this.maxFilesToCompact = conf.getInt("hbase.hstore.compaction.max", 10); + this.minCompactSize = conf.getLong("hbase.hstore.compaction.min.size", + this.region.memstoreFlushSize); + this.maxCompactSize + = conf.getLong("hbase.hstore.compaction.max.size", Long.MAX_VALUE); + this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false); if (HStore.closeCheckInterval == 0) { @@ -230,53 +239,13 @@ public class HStore extends SchemaConfigured implements Store { this.bytesPerChecksum = getBytesPerChecksum(conf); // Create a compaction tool instance this.compactor = new Compactor(this.conf); - - setCompactionPolicy(conf.get(COMPACTION_MANAGER_CLASS, DEFAULT_COMPACTION_MANAGER_CLASS)); } - /** - * This setter is used for unit testing - * TODO: Fix this for online configuration updating - */ - void setCompactionPolicy(String managerClassName) { - try { - Class managerClass = - (Class) Class.forName(managerClassName); - compactionManager = managerClass.getDeclaredConstructor( - new Class[] {Configuration.class, Store.class } ).newInstance( - new Object[] { conf, this } ); - } catch (ClassNotFoundException e) { - throw new UnsupportedOperationException( - "Unable to find region server interface " + managerClassName, e); - } catch (IllegalAccessException e) { - throw new UnsupportedOperationException( - "Unable to access specified class " + managerClassName, e); - } catch (InstantiationException e) { - throw new UnsupportedOperationException( - "Unable to instantiate specified class " + managerClassName, e); - } catch (InvocationTargetException e) { - throw new UnsupportedOperationException( - "Unable to invoke specified target class constructor " + managerClassName, e); - } catch (NoSuchMethodException e) { - throw new UnsupportedOperationException( - "Unable to find suitable constructor for class " + managerClassName, e); - } - } - - @Override - public Integer getDeterministicRandomSeed() { - ImmutableList snapshot = storefiles; - if (snapshot != null && !snapshot.isEmpty()) { - return snapshot.get(0).getPath().getName().hashCode(); - } - return null; - } - /** * @param family * @return */ - private static long determineTTLFromFamily(final HColumnDescriptor family) { + long getTTL(final HColumnDescriptor family) { // HCD.getTimeToLive returns ttl in seconds. Convert to milliseconds. long ttl = family.getTimeToLive(); if (ttl == HConstants.FOREVER) { @@ -311,11 +280,6 @@ public class HStore extends SchemaConfigured implements Store { return this.fs; } - public long getTtl() { - // TTL only applies if there's no MIN_VERSIONs setting on the column. - return (this.scanInfo.getMinVersions() == 0) ? this.ttl : Long.MAX_VALUE; - } - /** * Returns the configured bytesPerChecksum value. * @param conf The configuration @@ -807,11 +771,8 @@ public class HStore extends SchemaConfigured implements Store { } while (hasMore); } finally { // Write out the log sequence number that corresponds to this output - // hfile. Also write current time in metadata as minFlushTime. - // The hfile is current up to and including logCacheFlushId. + // hfile. The hfile is current up to and including logCacheFlushId. status.setStatus("Flushing " + this + ": appending metadata"); - writer.appendFileInfo(StoreFile.MIN_FLUSH_TIME, - Bytes.toBytes(EnvironmentEdgeManager.currentTimeMillis())); writer.appendMetadata(logCacheFlushId, false); status.setStatus("Flushing " + this + ": closing flushed file"); writer.close(); @@ -1053,12 +1014,12 @@ public class HStore extends SchemaConfigured implements Store { // Ready to go. Have list of files to compact. LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in " - + this + " of " + this.region.getRegionInfo().getRegionNameAsString() + + this + " of " + + this.region.getRegionInfo().getRegionNameAsString() + " into tmpdir=" + region.getTmpDir() + ", seqid=" + maxId + ", totalSize=" + StringUtils.humanReadableInt(cr.getSize())); StoreFile sf = null; - long compactionStartTime = EnvironmentEdgeManager.currentTimeMillis(); try { StoreFile.Writer writer = this.compactor.compact(this, filesToCompact, cr.isMajor(), maxId); @@ -1087,11 +1048,8 @@ public class HStore extends SchemaConfigured implements Store { (sf == null ? "none" : sf.getPath().getName()) + ", size=" + (sf == null ? "none" : StringUtils.humanReadableInt(sf.getReader().length())) - + "; total size for store is " + StringUtils.humanReadableInt(storeSize) - + ". This selection was in queue for " - + StringUtils.formatTimeDiff(compactionStartTime, cr.getSelectionTime()) + ", and took " - + StringUtils.formatTimeDiff(EnvironmentEdgeManager.currentTimeMillis(), compactionStartTime) - + " to execute."); + + "; total size for store is " + + StringUtils.humanReadableInt(storeSize)); return sf; } @@ -1149,8 +1107,11 @@ public class HStore extends SchemaConfigured implements Store { return hasReferences(this.storefiles); } - @Override - public boolean hasReferences(Collection files) { + /* + * @param files + * @return True if any of the files in files are References. + */ + private boolean hasReferences(Collection files) { if (files != null && files.size() > 0) { for (StoreFile hsf: files) { if (hsf.isReference()) { @@ -1161,6 +1122,22 @@ public class HStore extends SchemaConfigured implements Store { return false; } + /* + * Gets lowest timestamp from candidate StoreFiles + * + * @param fs + * @param dir + * @throws IOException + */ + public static long getLowestTimestamp(final List candidates) + throws IOException { + long minTs = Long.MAX_VALUE; + for (StoreFile storeFile : candidates) { + minTs = Math.min(minTs, storeFile.getModificationTimeStamp()); + } + return minTs; + } + @Override public CompactionProgress getCompactionProgress() { return this.compactor.getProgress(); @@ -1176,7 +1153,91 @@ public class HStore extends SchemaConfigured implements Store { } List candidates = new ArrayList(this.storefiles); - return compactionManager.isMajorCompaction(candidates); + + // exclude files above the max compaction threshold + // except: save all references. we MUST compact them + int pos = 0; + while (pos < candidates.size() && + candidates.get(pos).getReader().length() > this.maxCompactSize && + !candidates.get(pos).isReference()) ++pos; + candidates.subList(0, pos).clear(); + + return isMajorCompaction(candidates); + } + + /* + * @param filesToCompact Files to compact. Can be null. + * @return True if we should run a major compaction. + */ + private boolean isMajorCompaction(final List filesToCompact) throws IOException { + boolean result = false; + long mcTime = getNextMajorCompactTime(); + if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) { + return result; + } + // TODO: Use better method for determining stamp of last major (HBASE-2990) + long lowTimestamp = getLowestTimestamp(filesToCompact); + long now = System.currentTimeMillis(); + if (lowTimestamp > 0l && lowTimestamp < (now - mcTime)) { + // Major compaction time has elapsed. + if (filesToCompact.size() == 1) { + // Single file + StoreFile sf = filesToCompact.get(0); + long oldest = + (sf.getReader().timeRangeTracker == null) ? + Long.MIN_VALUE : + now - sf.getReader().timeRangeTracker.minimumTimestamp; + if (sf.isMajorCompaction() && + (this.ttl == HConstants.FOREVER || oldest < this.ttl)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skipping major compaction of " + this + + " because one (major) compacted file only and oldestTime " + + oldest + "ms is < ttl=" + this.ttl); + } + } else if (this.ttl != HConstants.FOREVER && oldest > this.ttl) { + LOG.debug("Major compaction triggered on store " + this + + ", because keyvalues outdated; time since last major compaction " + + (now - lowTimestamp) + "ms"); + result = true; + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Major compaction triggered on store " + this + + "; time since last major compaction " + (now - lowTimestamp) + "ms"); + } + result = true; + } + } + return result; + } + + long getNextMajorCompactTime() { + // default = 24hrs + long ret = conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24); + if (family.getValue(HConstants.MAJOR_COMPACTION_PERIOD) != null) { + String strCompactionTime = + family.getValue(HConstants.MAJOR_COMPACTION_PERIOD); + ret = (new Long(strCompactionTime)).longValue(); + } + + if (ret > 0) { + // default = 20% = +/- 4.8 hrs + double jitterPct = conf.getFloat("hbase.hregion.majorcompaction.jitter", + 0.20F); + if (jitterPct > 0) { + long jitter = Math.round(ret * jitterPct); + // deterministic jitter avoids a major compaction storm on restart + ImmutableList snapshot = storefiles; + if (snapshot != null && !snapshot.isEmpty()) { + String seed = snapshot.get(0).getPath().getName(); + double curRand = new Random(seed.hashCode()).nextDouble(); + ret += jitter - Math.round(2L * jitter * curRand); + } else { + ret = 0; // no storefiles == no major compaction + } + } + } + return ret; } public CompactionRequest requestCompaction() throws IOException { @@ -1212,10 +1273,9 @@ public class HStore extends SchemaConfigured implements Store { CompactSelection filesToCompact; if (override) { // coprocessor is overriding normal file selection - filesToCompact = new CompactSelection(candidates); + filesToCompact = new CompactSelection(conf, candidates); } else { - filesToCompact = compactionManager.selectCompaction(candidates, priority, - forceMajor && filesCompacting.isEmpty()); + filesToCompact = compactSelection(candidates, priority); } if (region.getCoprocessorHost() != null) { @@ -1265,6 +1325,191 @@ public class HStore extends SchemaConfigured implements Store { } } + /** + * Algorithm to choose which files to compact, see {@link #compactSelection(java.util.List, int)} + * @param candidates + * @return + * @throws IOException + */ + CompactSelection compactSelection(List candidates) throws IOException { + return compactSelection(candidates,Store.NO_PRIORITY); + } + + /** + * Algorithm to choose which files to compact + * + * Configuration knobs: + * "hbase.hstore.compaction.ratio" + * normal case: minor compact when file <= sum(smaller_files) * ratio + * "hbase.hstore.compaction.min.size" + * unconditionally compact individual files below this size + * "hbase.hstore.compaction.max.size" + * never compact individual files above this size (unless splitting) + * "hbase.hstore.compaction.min" + * min files needed to minor compact + * "hbase.hstore.compaction.max" + * max files to compact at once (avoids OOM) + * + * @param candidates candidate files, ordered from oldest to newest + * @return subset copy of candidate list that meets compaction criteria + * @throws IOException + */ + CompactSelection compactSelection(List candidates, int priority) + throws IOException { + // ASSUMPTION!!! filesCompacting is locked when calling this function + + /* normal skew: + * + * older ----> newer + * _ + * | | _ + * | | | | _ + * --|-|- |-|- |-|---_-------_------- minCompactSize + * | | | | | | | | _ | | + * | | | | | | | | | | | | + * | | | | | | | | | | | | + */ + CompactSelection compactSelection = new CompactSelection(conf, candidates); + + boolean forcemajor = this.forceMajor && filesCompacting.isEmpty(); + if (!forcemajor) { + // Delete the expired store files before the compaction selection. + if (conf.getBoolean("hbase.store.delete.expired.storefile", true) + && (ttl != Long.MAX_VALUE) && (this.scanInfo.minVersions == 0)) { + CompactSelection expiredSelection = compactSelection + .selectExpiredStoreFilesToCompact( + EnvironmentEdgeManager.currentTimeMillis() - this.ttl); + + // If there is any expired store files, delete them by compaction. + if (expiredSelection != null) { + return expiredSelection; + } + } + // do not compact old files above a configurable threshold + // save all references. we MUST compact them + int pos = 0; + while (pos < compactSelection.getFilesToCompact().size() && + compactSelection.getFilesToCompact().get(pos).getReader().length() + > maxCompactSize && + !compactSelection.getFilesToCompact().get(pos).isReference()) ++pos; + if (pos != 0) compactSelection.clearSubList(0, pos); + } + + if (compactSelection.getFilesToCompact().isEmpty()) { + LOG.debug(this.getHRegionInfo().getEncodedName() + " - " + + this + ": no store files to compact"); + compactSelection.emptyFileList(); + return compactSelection; + } + + // Force a major compaction if this is a user-requested major compaction, + // or if we do not have too many files to compact and this was requested + // as a major compaction + boolean majorcompaction = (forcemajor && priority == Store.PRIORITY_USER) || + (forcemajor || isMajorCompaction(compactSelection.getFilesToCompact())) && + (compactSelection.getFilesToCompact().size() < this.maxFilesToCompact + ); + LOG.debug(this.getHRegionInfo().getEncodedName() + " - " + + this.getColumnFamilyName() + ": Initiating " + + (majorcompaction ? "major" : "minor") + "compaction"); + + if (!majorcompaction && + !hasReferences(compactSelection.getFilesToCompact())) { + // we're doing a minor compaction, let's see what files are applicable + int start = 0; + double r = compactSelection.getCompactSelectionRatio(); + + // remove bulk import files that request to be excluded from minors + compactSelection.getFilesToCompact().removeAll(Collections2.filter( + compactSelection.getFilesToCompact(), + new Predicate() { + public boolean apply(StoreFile input) { + return input.excludeFromMinorCompaction(); + } + })); + + // skip selection algorithm if we don't have enough files + if (compactSelection.getFilesToCompact().size() < this.minFilesToCompact) { + if(LOG.isDebugEnabled()) { + LOG.debug("Not compacting files because we only have " + + compactSelection.getFilesToCompact().size() + + " files ready for compaction. Need " + this.minFilesToCompact + " to initiate."); + } + compactSelection.emptyFileList(); + return compactSelection; + } + + /* TODO: add sorting + unit test back in when HBASE-2856 is fixed + // Sort files by size to correct when normal skew is altered by bulk load. + Collections.sort(filesToCompact, StoreFile.Comparators.FILE_SIZE); + */ + + // get store file sizes for incremental compacting selection. + int countOfFiles = compactSelection.getFilesToCompact().size(); + long [] fileSizes = new long[countOfFiles]; + long [] sumSize = new long[countOfFiles]; + for (int i = countOfFiles-1; i >= 0; --i) { + StoreFile file = compactSelection.getFilesToCompact().get(i); + fileSizes[i] = file.getReader().length(); + // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo + int tooFar = i + this.maxFilesToCompact - 1; + sumSize[i] = fileSizes[i] + + ((i+1 < countOfFiles) ? sumSize[i+1] : 0) + - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0); + } + + /* Start at the oldest file and stop when you find the first file that + * meets compaction criteria: + * (1) a recently-flushed, small file (i.e. <= minCompactSize) + * OR + * (2) within the compactRatio of sum(newer_files) + * Given normal skew, any newer files will also meet this criteria + * + * Additional Note: + * If fileSizes.size() >> maxFilesToCompact, we will recurse on + * compact(). Consider the oldest files first to avoid a + * situation where we always compact [end-threshold,end). Then, the + * last file becomes an aggregate of the previous compactions. + */ + while(countOfFiles - start >= this.minFilesToCompact && + fileSizes[start] > + Math.max(minCompactSize, (long)(sumSize[start+1] * r))) { + ++start; + } + int end = Math.min(countOfFiles, start + this.maxFilesToCompact); + long totalSize = fileSizes[start] + + ((start+1 < countOfFiles) ? sumSize[start+1] : 0); + compactSelection = compactSelection.getSubList(start, end); + + // if we don't have enough files to compact, just wait + if (compactSelection.getFilesToCompact().size() < this.minFilesToCompact) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skipped compaction of " + this + + ". Only " + (end - start) + " file(s) of size " + + StringUtils.humanReadableInt(totalSize) + + " have met compaction criteria."); + } + compactSelection.emptyFileList(); + return compactSelection; + } + } else { + if(majorcompaction) { + if (compactSelection.getFilesToCompact().size() > this.maxFilesToCompact) { + LOG.debug("Warning, compacting more than " + this.maxFilesToCompact + + " files, probably because of a user-requested major compaction"); + if(priority != Store.PRIORITY_USER) { + LOG.error("Compacting more than max files on a non user-requested compaction"); + } + } + } else if (compactSelection.getFilesToCompact().size() > this.maxFilesToCompact) { + // all files included in this compaction, up to max + int pastMax = compactSelection.getFilesToCompact().size() - this.maxFilesToCompact; + compactSelection.getFilesToCompact().subList(0, pastMax).clear(); + } + } + return compactSelection; + } + /** * Validates a store file by opening and closing it. In HFileV2 this should * not be an expensive operation. @@ -1772,7 +2017,11 @@ public class HStore extends SchemaConfigured implements Store { @Override public boolean throttleCompaction(long compactionSize) { - return compactionManager.throttleCompaction(compactionSize); + // see HBASE-5867 for discussion on the default + long throttlePoint = conf.getLong( + "hbase.regionserver.thread.compaction.throttle", + 2 * this.minFilesToCompact * this.region.memstoreFlushSize); + return compactionSize > throttlePoint; } @Override @@ -1867,7 +2116,7 @@ public class HStore extends SchemaConfigured implements Store { @Override public boolean needsCompaction() { - return compactionManager.needsCompaction(storefiles.size() - filesCompacting.size()); + return (storefiles.size() - filesCompacting.size()) > minFilesToCompact; } @Override @@ -1877,8 +2126,8 @@ public class HStore extends SchemaConfigured implements Store { public static final long FIXED_OVERHEAD = ClassSize.align(SchemaConfigured.SCHEMA_CONFIGURED_UNALIGNED_HEAP_SIZE + - + (18 * ClassSize.REFERENCE) + (4 * Bytes.SIZEOF_LONG) - + (3 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN); + + (17 * ClassSize.REFERENCE) + (6 * Bytes.SIZEOF_LONG) + + (5 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN); public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK @@ -1899,15 +2148,6 @@ public class HStore extends SchemaConfigured implements Store { return scanInfo; } - /** - * Refreshes compaction manager class configuration. - * Used for tests only - not plumbed thru any layers. - * TODO: replace when HBASE-3909 is in. - */ - void updateConfiguration() { - setCompactionPolicy(conf.get(COMPACTION_MANAGER_CLASS, DEFAULT_COMPACTION_MANAGER_CLASS)); - } - /** * Immutable information for scans over a store. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 54d6390c64f..d391a16df74 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -18,8 +18,6 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.util.Collection; import java.util.List; import java.util.NavigableSet; @@ -206,12 +204,6 @@ public interface Store extends SchemaAware, HeapSize { * @return true if the store has any underlying reference files to older HFiles */ public boolean hasReferences(); - - /* - * @param files - * @return True if any of the files in files are References. - */ - public boolean hasReferences(Collection files); /** * @return The size of this store's memstore, in bytes @@ -275,11 +267,6 @@ public interface Store extends SchemaAware, HeapSize { * @return the total size of all Bloom filters in the store */ public long getTotalStaticBloomSize(); - - /** - * Returns the TTL for this store's column family. - */ - public long getTtl(); // Test-helper methods @@ -300,10 +287,4 @@ public interface Store extends SchemaAware, HeapSize { * @return the parent region hosting this store */ public HRegion getHRegion(); - - /** - * @return A hash code depending on the state of the current store files. - * This is used as seed for deterministic random generator for selecting major compaction time - */ - public Integer getDeterministicRandomSeed(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index 6fc0beb8cf5..af1225a1481 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -67,7 +67,6 @@ import org.apache.hadoop.hbase.util.BloomFilter; import org.apache.hadoop.hbase.util.BloomFilterFactory; import org.apache.hadoop.hbase.util.BloomFilterWriter; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.io.RawComparator; @@ -115,9 +114,6 @@ public class StoreFile extends SchemaConfigured { /** Max Sequence ID in FileInfo */ public static final byte [] MAX_SEQ_ID_KEY = Bytes.toBytes("MAX_SEQ_ID_KEY"); - /** Min Flush time in FileInfo */ - public static final byte [] MIN_FLUSH_TIME = Bytes.toBytes("MIN_FLUSH_TIME"); - /** Major compaction flag in FileInfo */ public static final byte[] MAJOR_COMPACTION_KEY = Bytes.toBytes("MAJOR_COMPACTION_KEY"); @@ -147,9 +143,6 @@ public class StoreFile extends SchemaConfigured { // Need to make it 8k for testing. public static final int DEFAULT_BLOCKSIZE_SMALL = 8 * 1024; - /** Default value for files without minFlushTime in metadata */ - public static final long NO_MIN_FLUSH_TIME = -1; - private final FileSystem fs; // This file's path. @@ -176,8 +169,6 @@ public class StoreFile extends SchemaConfigured { // Keys for metadata stored in backing HFile. // Set when we obtain a Reader. private long sequenceid = -1; - // default value is -1, remains -1 if file written without minFlushTime - private long minFlushTime = NO_MIN_FLUSH_TIME; // max of the MemstoreTS in the KV's in this store // Set when we obtain a Reader. @@ -390,22 +381,6 @@ public class StoreFile extends SchemaConfigured { return this.sequenceid; } - public boolean hasMinFlushTime() { - return this.minFlushTime != NO_MIN_FLUSH_TIME; - } - - public long getMinFlushTime() { - // BulkLoad files are assumed to contain very old data, return 0 - if (isBulkLoadResult() && getMaxSequenceId() <= 0) { - return 0; - } else if (this.minFlushTime == NO_MIN_FLUSH_TIME) { - // File written without minFlushTime field assume recent data - return EnvironmentEdgeManager.currentTimeMillis(); - } else { - return this.minFlushTime; - } - } - public long getModificationTimeStamp() { return modificationTimeStamp; } @@ -612,10 +587,7 @@ public class StoreFile extends SchemaConfigured { } } } - b = metadataMap.get(MIN_FLUSH_TIME); - if (b != null) { - this.minFlushTime = Bytes.toLong(b); - } + this.reader.setSequenceID(this.sequenceid); b = metadataMap.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TierCompactionConfiguration.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TierCompactionConfiguration.java deleted file mode 100644 index 725aa5c0d59..00000000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TierCompactionConfiguration.java +++ /dev/null @@ -1,267 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.regionserver; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.util.StringUtils; - -import java.text.DecimalFormat; - -/** - * Control knobs for default compaction algorithm - */ -@InterfaceAudience.Private -public class TierCompactionConfiguration extends CompactionConfiguration { - - private CompactionTier[] compactionTier; - private boolean recentFirstOrder; - - TierCompactionConfiguration(Configuration conf, Store store) { - super(conf, store); - - String strPrefix = "hbase.hstore.compaction."; - String strSchema = "tbl." + store.getHRegion().getTableDesc().getNameAsString() - + "cf." + store.getFamily().getNameAsString() + "."; - String strDefault = "Default."; - String strAttribute; - // If value not set for family, use default family (by passing null). - // If default value not set, use 1 tier. - - strAttribute = "NumCompactionTiers"; - compactionTier = new CompactionTier[ - conf.getInt(strPrefix + strSchema + strAttribute, - conf.getInt(strPrefix + strDefault + strAttribute, - 1))]; - - strAttribute = "IsRecentFirstOrder"; - recentFirstOrder = - conf.getBoolean(strPrefix + strSchema + strAttribute, - conf.getBoolean(strPrefix + strDefault + strAttribute, - true)); - - strAttribute = "MinCompactSize"; - minCompactSize = - conf.getLong(strPrefix + strSchema + strAttribute, - conf.getLong(strPrefix + strDefault + strAttribute, - 0)); - - strAttribute = "MaxCompactSize"; - maxCompactSize = - conf.getLong(strPrefix + strSchema + strAttribute, - conf.getLong(strPrefix + strDefault + strAttribute, - Long.MAX_VALUE)); - - strAttribute = "ShouldDeleteExpired"; - shouldDeleteExpired = - conf.getBoolean(strPrefix + strSchema + strAttribute, - conf.getBoolean(strPrefix + strDefault + strAttribute, - shouldDeleteExpired)); - - strAttribute = "ThrottlePoint"; - throttlePoint = - conf.getLong(strPrefix + strSchema + strAttribute, - conf.getLong(strPrefix + strDefault + strAttribute, - throttlePoint)); - - strAttribute = "MajorCompactionPeriod"; - majorCompactionPeriod = - conf.getLong(strPrefix + strSchema + strAttribute, - conf.getLong(strPrefix + strDefault + strAttribute, - majorCompactionPeriod)); - - strAttribute = "MajorCompactionJitter"; - majorCompactionJitter = - conf.getFloat( - strPrefix + strSchema + strAttribute, - conf.getFloat( - strPrefix + strDefault + strAttribute, - majorCompactionJitter - ) - ); - - for (int i = 0; i < compactionTier.length; i++) { - compactionTier[i] = new CompactionTier(i); - } - } - /** - * @return Number of compaction Tiers - */ - int getNumCompactionTiers() { - return compactionTier.length; - } - - /** - * @return The i-th tier from most recent - */ - CompactionTier getCompactionTier(int i) { - return compactionTier[i]; - } - - /** - * @return Whether the tiers will be checked for compaction from newest to oldest - */ - boolean isRecentFirstOrder() { - return recentFirstOrder; - } - - /** - * Parameters for each tier - */ - class CompactionTier { - - private long maxAgeInDisk; - private long maxSize; - private double tierCompactionRatio; - private int tierMinFilesToCompact; - private int tierMaxFilesToCompact; - private int endingIndexForTier; - - CompactionTier(int tier) { - String strPrefix = "hbase.hstore.compaction."; - String strSchema = "tbl." + store.getHRegion().getTableDesc().getNameAsString() - + "cf." + store.getFamily().getNameAsString() + "."; - String strDefault = "Default."; - String strDefTier = ""; - String strTier = "Tier." + String.valueOf(tier) + "."; - String strAttribute; - - /** - * Use value set for current family, current tier - * If not set, use value set for current family, default tier - * if not set, use value set for Default family, current tier - * If not set, use value set for Default family, default tier - * Else just use a default value - */ - - strAttribute = "MaxAgeInDisk"; - maxAgeInDisk = - conf.getLong(strPrefix + strSchema + strTier + strAttribute, - conf.getLong(strPrefix + strDefault + strTier + strAttribute, - Long.MAX_VALUE)); - - strAttribute = "MaxSize"; - maxSize = - conf.getLong(strPrefix + strSchema + strTier + strAttribute, - conf.getLong(strPrefix + strDefault + strTier + strAttribute, - Long.MAX_VALUE)); - - strAttribute = "CompactionRatio"; - tierCompactionRatio = (double) - conf.getFloat(strPrefix + strSchema + strTier + strAttribute, - conf.getFloat(strPrefix + strSchema + strDefTier + strAttribute, - conf.getFloat(strPrefix + strDefault + strTier + strAttribute, - conf.getFloat(strPrefix + strDefault + strDefTier + strAttribute, - (float) compactionRatio)))); - - strAttribute = "MinFilesToCompact"; - tierMinFilesToCompact = - conf.getInt(strPrefix + strSchema + strTier + strAttribute, - conf.getInt(strPrefix + strSchema + strDefTier + strAttribute, - conf.getInt(strPrefix + strDefault + strTier + strAttribute, - conf.getInt(strPrefix + strDefault + strDefTier + strAttribute, - minFilesToCompact)))); - - strAttribute = "MaxFilesToCompact"; - tierMaxFilesToCompact = - conf.getInt(strPrefix + strSchema + strTier + strAttribute, - conf.getInt(strPrefix + strSchema + strDefTier + strAttribute, - conf.getInt(strPrefix + strDefault + strTier + strAttribute, - conf.getInt(strPrefix + strDefault + strDefTier + strAttribute, - maxFilesToCompact)))); - - strAttribute = "EndingIndexForTier"; - endingIndexForTier = - conf.getInt(strPrefix + strSchema + strTier + strAttribute, - conf.getInt(strPrefix + strDefault + strTier + strAttribute, - tier)); - - //make sure this value is not incorrectly set - if (endingIndexForTier < 0 || endingIndexForTier > tier) { - LOG.error("EndingIndexForTier improperly set. Using default value."); - endingIndexForTier = tier; - } - - } - - /** - * @return Upper bound on storeFile's minFlushTime to be included in this tier - */ - long getMaxAgeInDisk() { - return maxAgeInDisk; - } - - /** - * @return Upper bound on storeFile's size to be included in this tier - */ - long getMaxSize() { - return maxSize; - } - - /** - * @return Compaction ratio for selections of this tier - */ - double getCompactionRatio() { - return tierCompactionRatio; - } - - /** - * @return lower bound on number of files in selections of this tier - */ - int getMinFilesToCompact() { - return tierMinFilesToCompact; - } - - /** - * @return upper bound on number of files in selections of this tier - */ - int getMaxFilesToCompact() { - return tierMaxFilesToCompact; - } - - /** - * @return the newest tier which will also be included in selections of this tier - * by default it is the index of this tier, must be between 0 and this tier - */ - int getEndingIndexForTier() { - return endingIndexForTier; - } - - String getDescription() { - String ageString = "INF"; - String sizeString = "INF"; - if (getMaxAgeInDisk() < Long.MAX_VALUE) { - ageString = StringUtils.formatTime(getMaxAgeInDisk()); - } - if (getMaxSize() < Long.MAX_VALUE) { - ageString = StringUtils.humanReadableInt(getMaxSize()); - } - String ret = "Has files upto age " + ageString - + " and upto size " + sizeString + ". " - + "Compaction ratio: " + (new DecimalFormat("#.##")).format(getCompactionRatio()) + ", " - + "Compaction Selection with at least " + getMinFilesToCompact() + " and " - + "at most " + getMaxFilesToCompact() + " files possible, " - + "Selections in this tier includes files up to tier " + getEndingIndexForTier(); - return ret; - } - - } - -} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TierCompactionManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TierCompactionManager.java deleted file mode 100644 index bd9130ab770..00000000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TierCompactionManager.java +++ /dev/null @@ -1,256 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.regionserver; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; -import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.util.StringUtils; - -import java.io.IOException; - -@InterfaceAudience.Private -public class TierCompactionManager extends CompactionManager { - - private static final Log LOG = LogFactory.getLog(TierCompactionManager.class); - - private int[] endInTier; - private int[] tierOf; - - private TierCompactionConfiguration tierConf; - - TierCompactionManager(Configuration configuration, Store store) { - super(configuration, store); - comConf = new TierCompactionConfiguration(configuration, store); - tierConf = (TierCompactionConfiguration) comConf; - } - - /** - * @param candidates pre-filtrate - * @return filtered subset - * -- Tier Based minor compaction selection algorithm: Choose CompactSelection from candidates -- - *

- * First exclude bulk-load files if indicated in configuration. - * Arrange files from oldest to newest then select an appropriate ['start','end') pair - * try 'start' from oldest to newest (smallest to largest fileIndex) - * for each value, identify the 'end' fileIndex - * stop when the range ['start','end') is an admissible compaction - *

- * Notes: - *

- * a compaction is admissible if - * - file fileSize[start] is at most maxCompactSize AND - * - number of files is at least currentTier.minFilesToCompact AND - * - (fileSize[start] is at most ratio times the rest of the files in the compaction OR - * - fileSize[start] is at most minCompactSize) - *

- * end is endInTier[tierOf[start].endingInclusionTier] - * By default currentTier.endingIndexForTier = currentTier, so in the default - * case 'end' is always 1 + the last fileIndex in currentTier, making sure - * files from different tiers are never selected together in the default case - * normal skew: - * - * older ----> newer (increasing seqID, increasing minFlushTime) - * - * Tier 2 | Tier 1 | Tier 0 - * | | - * _ | | - * | | | _ | - * | | | | | _ | - * --|-|-|-|-|- |-|-|--_-------_------- minCompactSize - * | | | | | | | | | | _ | | - * | | | | | | | | | | | | | | - * | | | | | | | | | | | | | | - */ - @Override - CompactSelection applyCompactionPolicy(CompactSelection candidates) throws IOException { - // we're doing a minor compaction, let's see what files are applicable - int start = -1; - int end = -1; - - // skip selection algorithm if we don't have enough files - if (candidates.getFilesToCompact().isEmpty()) { - candidates.emptyFileList(); - return candidates; - } - - // get store file sizes for incremental compacting selection. - int countOfFiles = candidates.getFilesToCompact().size(); - long[] fileSizes = new long[countOfFiles]; - StoreFile file; - long[] sumSize = new long[countOfFiles + 1]; - sumSize[countOfFiles] = 0; - for (int i = countOfFiles - 1; i >= 0; --i) { - file = candidates.getFilesToCompact().get(i); - fileSizes[i] = file.getReader().length(); - // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo - sumSize[i] = fileSizes[i] + sumSize[i + 1]; - } - - /** - * divide into tiers: - * assign tierOf[fileIndex] = tierIndex - * assign endInTier[tierIndex] = 1 + index of the last file in tierIndex - */ - // Backward compatibility - if files with indices < i don't have minFlushTime field, then - // all of them get tierOf[i]. If no file has minFlushTime all gets tier zero. - int numTiers = tierConf.getNumCompactionTiers(); - TierCompactionConfiguration.CompactionTier tier; - tierOf = new int[countOfFiles]; - endInTier = new int[numTiers + 1]; - endInTier[numTiers] = 0; - - LOG.info("Applying TierCompactionPolicy with " + countOfFiles + " files"); - - int i; - int j = countOfFiles; - - for (i = 0; i < numTiers; i++) { - tier = tierConf.getCompactionTier(i); - endInTier[i] = j; - while (j > 0) { - file = candidates.getFilesToCompact().get(j - 1); - if (!isInTier(file, tier)) { - break; - } - j--; - tierOf[j] = i; - } - } - - long restSize; - double ratio; - - //Main algorithm - for (j = 0; j < countOfFiles; j++) { - start = next(start); - tier = tierConf.getCompactionTier(tierOf[start]); - end = endInTier[tier.getEndingIndexForTier()]; - restSize = sumSize[start + 1] - sumSize[end]; - ratio = tier.getCompactionRatio(); - if (fileSizes[start] <= tierConf.getMaxCompactSize() && - end - start >= tier.getMinFilesToCompact() && - (fileSizes[start] <= tierConf.getMinCompactSize() || - (fileSizes[start] <= restSize * ratio))) { - break; - } - } - String tab = " "; - for (i = 0; i < numTiers; i++) { - LOG.info("Tier " + i + " : " + tierConf.getCompactionTier(i).getDescription()); - if (endInTier[i] == endInTier[i+1]) { - LOG.info(tab + "No file is assigned to this tier."); - } else { - LOG.info(tab + (endInTier[i] - endInTier[i+1]) - + " file(s) are assigned to this tier with serial number(s) " - + endInTier[i + 1] + " to " + (endInTier[i] - 1)); - } - for (j = endInTier[i + 1]; j < endInTier[i]; j++) { - file = candidates.getFilesToCompact().get(j); - LOG.info(tab + tab + "SeqID = " + file.getMaxSequenceId() - + ", Age = " + StringUtils.formatTimeDiff( - EnvironmentEdgeManager.currentTimeMillis(), file.getMinFlushTime()) - + ", Size = " + StringUtils.humanReadableInt(fileSizes[j]) - + ", Path = " + file.getPath()); - } - } - if (start < countOfFiles) { - end = Math.min(end, start - + tierConf.getCompactionTier(tierOf[start]).getMaxFilesToCompact()); - } - if (start < end) { - String strTier = String.valueOf(tierOf[start]); - if (tierOf[end - 1] != tierOf[start]) { - strTier += " to " + tierOf[end - 1]; - } - LOG.info("Tier Based compaction algorithm has selected " + (end - start) - + " files from tier " + strTier + " out of " + countOfFiles + " candidates"); - } - - candidates = candidates.getSubList(start, end); - return candidates; - } - - private boolean isInTier(StoreFile file, TierCompactionConfiguration.CompactionTier tier) { - return file.getReader().length() <= tier.getMaxSize() && - EnvironmentEdgeManager.currentTimeMillis()-file.getMinFlushTime() <= tier.getMaxAgeInDisk(); - } - - /** - * This function iterates over the start values in order. - * Whenever an admissible compaction is found, we return the selection. - * Hence the order is important if there are more than one admissible compaction. - * @param start current Value - * @return next Value - */ - private int next(int start) { - if (tierConf.isRecentFirstOrder()) { - return backNext(start); - } - return fwdNext(start); - } - - /** - * This function iterates over the start values in newer-first order of tiers, - * but older-first order of files within a tier. - * For example, suppose the tiers are: - * Tier 3 - files 0,1,2 - * Tier 2 - files 3,4 - * Tier 1 - no files - * Tier 0 - files 5,6,7 - * Then the order of 'start' files will be: - * 5,6,7,3,4,0,1,2 - * @param start current Value - * @return next Value - */ - private int backNext(int start) { - int tier = 0; - if (start == -1) { - while (endInTier[tier] >= endInTier[0]) { - tier++; - } - return endInTier[tier]; - } - tier = tierOf[start]; - if (endInTier[tier] == start + 1) { - tier++; - start = endInTier[tier]; - while (endInTier[tier] >= start) { - tier++; - } - return endInTier[tier]; - } - return start + 1; - } - - /** - * This function iterates over the start values in older-first order of files. - * @param start current Value - * @return next Value - */ - private int fwdNext(int start) { - return start + 1; - } - -} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactSelection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactSelection.java index 66d8a0f9e72..d240b87e08e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactSelection.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactSelection.java @@ -19,13 +19,15 @@ package org.apache.hadoop.hbase.regionserver.compactions; import java.util.ArrayList; +import java.util.Calendar; +import java.util.GregorianCalendar; import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.regionserver.StoreFile; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @InterfaceAudience.Private public class CompactSelection { @@ -46,15 +48,37 @@ public class CompactSelection { */ private final static Object compactionCountLock = new Object(); + // HBase conf object + Configuration conf; // was this compaction promoted to an off-peak boolean isOffPeakCompaction = false; - // CompactSelection object creation time. - private final long selectionTime; + // compactRatio: double on purpose! Float.MAX < Long.MAX < Double.MAX + // With float, java will downcast your long to float for comparisons (bad) + private double compactRatio; + // compaction ratio off-peak + private double compactRatioOffPeak; + // offpeak start time + private int offPeakStartHour = -1; + // off peak end time + private int offPeakEndHour = -1; - public CompactSelection(List filesToCompact) { - this.selectionTime = EnvironmentEdgeManager.currentTimeMillis(); + public CompactSelection(Configuration conf, List filesToCompact) { this.filesToCompact = filesToCompact; - this.isOffPeakCompaction = false; + this.conf = conf; + this.compactRatio = conf.getFloat("hbase.hstore.compaction.ratio", 1.2F); + this.compactRatioOffPeak = conf.getFloat("hbase.hstore.compaction.ratio.offpeak", 5.0F); + + // Peak time is from [offPeakStartHour, offPeakEndHour). Valid numbers are [0, 23] + this.offPeakStartHour = conf.getInt("hbase.offpeak.start.hour", -1); + this.offPeakEndHour = conf.getInt("hbase.offpeak.end.hour", -1); + if (!isValidHour(this.offPeakStartHour) || !isValidHour(this.offPeakEndHour)) { + if (!(this.offPeakStartHour == -1 && this.offPeakEndHour == -1)) { + LOG.warn("Invalid start/end hour for peak hour : start = " + + this.offPeakStartHour + " end = " + this.offPeakEndHour + + ". Valid numbers are [0-23]"); + } + this.offPeakStartHour = this.offPeakEndHour = -1; + } } /** @@ -89,25 +113,49 @@ public class CompactSelection { } if (hasExpiredStoreFiles) { - expiredSFSelection = new CompactSelection(expiredStoreFiles); + expiredSFSelection = new CompactSelection(conf, expiredStoreFiles); } return expiredSFSelection; } + /** + * If the current hour falls in the off peak times and there are no + * outstanding off peak compactions, the current compaction is + * promoted to an off peak compaction. Currently only one off peak + * compaction is present in the compaction queue. + * + * @param currentHour + * @return + */ + public double getCompactSelectionRatio() { + double r = this.compactRatio; + synchronized(compactionCountLock) { + if (isOffPeakHour() && numOutstandingOffPeakCompactions == 0) { + r = this.compactRatioOffPeak; + numOutstandingOffPeakCompactions++; + isOffPeakCompaction = true; + } + } + if(isOffPeakCompaction) { + LOG.info("Running an off-peak compaction, selection ratio = " + + compactRatioOffPeak + ", numOutstandingOffPeakCompactions is now " + + numOutstandingOffPeakCompactions); + } + return r; + } + /** * The current compaction finished, so reset the off peak compactions count * if this was an off peak compaction. */ public void finishRequest() { if (isOffPeakCompaction) { - long newValueToLog = -1; synchronized(compactionCountLock) { - assert !isOffPeakCompaction : "Double-counting off-peak count for compaction"; - newValueToLog = --numOutstandingOffPeakCompactions; + numOutstandingOffPeakCompactions--; isOffPeakCompaction = false; } LOG.info("Compaction done, numOutstandingOffPeakCompactions is now " + - newValueToLog); + numOutstandingOffPeakCompactions); } } @@ -122,14 +170,13 @@ public class CompactSelection { public void emptyFileList() { filesToCompact.clear(); if (isOffPeakCompaction) { - long newValueToLog = -1; synchronized(compactionCountLock) { // reset the off peak count - newValueToLog = --numOutstandingOffPeakCompactions; + numOutstandingOffPeakCompactions--; isOffPeakCompaction = false; } LOG.info("Nothing to compact, numOutstandingOffPeakCompactions is now " + - newValueToLog); + numOutstandingOffPeakCompactions); } } @@ -137,30 +184,16 @@ public class CompactSelection { return this.isOffPeakCompaction; } - public static long getNumOutStandingOffPeakCompactions() { - synchronized(compactionCountLock) { - return numOutstandingOffPeakCompactions; + private boolean isOffPeakHour() { + int currentHour = (new GregorianCalendar()).get(Calendar.HOUR_OF_DAY); + // If offpeak time checking is disabled just return false. + if (this.offPeakStartHour == this.offPeakEndHour) { + return false; } - } - - /** - * Tries making the compaction off-peak. - * Only checks internal compaction constraints, not timing. - * @return Eventual value of isOffPeakCompaction. - */ - public boolean trySetOffpeak() { - assert !isOffPeakCompaction : "Double-setting off-peak for compaction " + this; - synchronized(compactionCountLock) { - if (numOutstandingOffPeakCompactions == 0) { - numOutstandingOffPeakCompactions++; - isOffPeakCompaction = true; - } + if (this.offPeakStartHour < this.offPeakEndHour) { + return (currentHour >= this.offPeakStartHour && currentHour < this.offPeakEndHour); } - return isOffPeakCompaction; - } - - public long getSelectionTime() { - return selectionTime; + return (currentHour >= this.offPeakStartHour || currentHour < this.offPeakEndHour); } public CompactSelection subList(int start, int end) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java index b98dec339dd..bee966800ca 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java @@ -208,10 +208,6 @@ public class CompactionRequest implements Comparable, return p; } - public long getSelectionTime() { - return compactSelection.getSelectionTime(); - } - /** Gets the priority for the request */ public void setPriority(int p) { this.p = p; @@ -276,7 +272,7 @@ public class CompactionRequest implements Comparable, server.checkFileSystem(); } finally { s.finishRequest(this); - LOG.debug("CompactSplitThread Status: " + server.compactSplitThread); + LOG.debug("CompactSplitThread status: " + server.compactSplitThread); } } diff --git a/hbase-server/src/main/resources/hbase-compactions.xml b/hbase-server/src/main/resources/hbase-compactions.xml deleted file mode 100644 index 2d06e29dd64..00000000000 --- a/hbase-server/src/main/resources/hbase-compactions.xml +++ /dev/null @@ -1,160 +0,0 @@ - - - - - - hbase.hstore.compactionThreshold - 3 - - If more than this number of HStoreFiles in any one HStore - (one HStoreFile is written per flush of memstore) then a compaction - is run to rewrite all HStoreFiles files as one. Larger numbers - put off compaction but when it runs, it takes longer to complete. - During a compaction, updates cannot be flushed to disk. Long - compactions require memory sufficient to carry the logging of - all updates across the duration of the compaction. - - If too large, clients timeout during compaction. - - - - hbase.hstore.compaction.max - 10 - Max number of HStoreFiles to compact per 'minor' compaction. - - - - hbase.hregion.majorcompaction - 86400000 - The time (in miliseconds) between 'major' compactions of all - HStoreFiles in a region. Default: 1 day. - Set to 0 to disable automated major compactions. - - - - - hbase.hstore.compaction.CompactionPolicy - TierBasedCompactionPolicy - The compaction policy which should be used - - - - hbase.hstore.compaction.tbl.cluster_test.cf.test_cf.NumCompactionTiers - 4 - The number of tiers into which the files are assigned - - - - - hbase.hstore.compaction.Default.Tier.0.MaxAgeInDisk - 3600000 - Length of time for which flush files are in 1st tier - value one hour. - - - - hbase.hstore.compaction.tbl.cluster_test.cf.test_cf.Tier.1.MaxAgeInDisk - 10800000 - Maximum age of a file to be in second tier - value 3 hours. - - - - hbase.hstore.compaction.Default.Tier.2.MaxAgeInDisk - 36000000 - Maximum age of a file to be in third tier - value 10 hours - - - - - hbase.hstore.compaction.Default.CompactionRatio - 0.0 - The default compaction ratio used if unspecified. - value 0. - - - - hbase.hstore.compaction.Default.Tier.1.CompactionRatio - 1.0 - The compaction ratio for the second tier. - value 1.5. - - - - hbase.hstore.compaction.Default.Tier.2.CompactionRatio - 0.75 - The compaction ratio for the third tier. - value 0.75. - - - - hbase.hstore.compaction.Default.Tier.3.CompactionRatio - 0.2 - The compaction ratio for the fourth tier. - value 0.2. - - - - - hbase.hstore.compaction.min - 2 - Default minimum number of files to compact - value 2. - - - - hbase.hstore.compaction.tbl.cluster_test.cf.MinFilesToCompact - 3 - Overridden Default minimum number of files to compact - value 3. - - - - hbase.hstore.compaction.max - 7 - Default maximum number of files to compact - value 7. - - - - hbase.hstore.compaction.Default.Tier.1.MinFilesToCompact - 2 - minimum number of files to compact in second tier - value 2. - - - - hbase.hstore.compaction.Default.Tier.3.MaxFilesToCompact - 6 - maximum number of files to compact in fourth tier - value 6. - - - - - hbase.hstore.compaction.Default.Tier.2.EndInclusionTier - 1 - The minimum tier whose files go together with this tier - value 1. - - - diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSelection.java similarity index 74% rename from hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java rename to hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSelection.java index 498959236d9..4a26f21f5bb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSelection.java @@ -1,4 +1,5 @@ /** + * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -25,7 +26,6 @@ import java.util.GregorianCalendar; import java.util.List; import junit.framework.TestCase; -import org.junit.experimental.categories.Category; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -39,27 +39,26 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogFactory; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import com.google.common.collect.Lists; +import org.junit.experimental.categories.Category; @Category(SmallTests.class) -public class TestDefaultCompactSelection extends TestCase { - private final static Log LOG = LogFactory.getLog(TestDefaultCompactSelection.class); +public class TestCompactSelection extends TestCase { + private final static Log LOG = LogFactory.getLog(TestCompactSelection.class); private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - protected Configuration conf; - protected HStore store; + private Configuration conf; + private HStore store; private static final String DIR= TEST_UTIL.getDataTestDir("TestCompactSelection").toString(); private static Path TEST_FILE; - private CompactionManager manager; - protected static final int minFiles = 3; - protected static final int maxFiles = 5; + private static final int minFiles = 3; + private static final int maxFiles = 5; - protected static final long minSize = 10; - protected static final long maxSize = 1000; + private static final long minSize = 10; + private static final long maxSize = 1000; @Override @@ -95,8 +94,6 @@ public class TestDefaultCompactSelection extends TestCase { region = new HRegion(tableDir, hlog, fs, conf, info, htd, null); store = new HStore(basedir, region, hcd, fs, conf); - manager = store.compactionManager; - TEST_FILE = StoreFile.getRandomFilename(fs, store.getHomedir()); fs.create(TEST_FILE); } @@ -105,41 +102,20 @@ public class TestDefaultCompactSelection extends TestCase { static class MockStoreFile extends StoreFile { long length = 0; boolean isRef = false; - long ageInDisk; - long sequenceid; - MockStoreFile(long length, long ageInDisk, boolean isRef, long sequenceid) throws IOException { - super(TEST_UTIL.getTestFileSystem(), TEST_FILE, TEST_UTIL.getConfiguration(), + MockStoreFile(long length, boolean isRef) throws IOException { + super(TEST_UTIL.getTestFileSystem(), TEST_FILE, + TEST_UTIL.getConfiguration(), new CacheConfig(TEST_UTIL.getConfiguration()), BloomType.NONE, NoOpDataBlockEncoder.INSTANCE); this.length = length; - this.isRef = isRef; - this.ageInDisk = ageInDisk; - this.sequenceid = sequenceid; + this.isRef = isRef; } void setLength(long newLen) { this.length = newLen; } - @Override - public boolean hasMinFlushTime() { - return ageInDisk != 0; - } - - @Override - public long getMinFlushTime() { - if (ageInDisk < 0) { - return ageInDisk; - } - return EnvironmentEdgeManager.currentTimeMillis() - ageInDisk; - } - - @Override - public long getMaxSequenceId() { - return sequenceid; - } - @Override boolean isMajorCompaction() { return false; @@ -162,70 +138,43 @@ public class TestDefaultCompactSelection extends TestCase { } } - ArrayList toArrayList(long... numbers) { - ArrayList result = new ArrayList(); - for (long i : numbers) { - result.add(i); - } - return result; + List sfCreate(long ... sizes) throws IOException { + return sfCreate(false, sizes); } - List sfCreate(long... sizes) throws IOException { - ArrayList ageInDisk = new ArrayList(); - for (int i = 0; i < sizes.length; i++) { - ageInDisk.add(0L); - } - return sfCreate(toArrayList(sizes), ageInDisk); - } - - List sfCreate(ArrayList sizes, ArrayList ageInDisk) - throws IOException { - return sfCreate(false, sizes, ageInDisk); - } - - List sfCreate(boolean isReference, long... sizes) throws IOException { - ArrayList ageInDisk = new ArrayList(sizes.length); - for (int i = 0; i < sizes.length; i++) { - ageInDisk.add(0L); - } - return sfCreate(isReference, toArrayList(sizes), ageInDisk); - } - - List sfCreate(boolean isReference, ArrayList sizes, ArrayList ageInDisk) - throws IOException { + List sfCreate(boolean isReference, long ... sizes) + throws IOException { List ret = Lists.newArrayList(); - for (int i = 0; i < sizes.size(); i++) { - ret.add(new MockStoreFile(sizes.get(i), ageInDisk.get(i), isReference, i)); + for (long i : sizes) { + ret.add(new MockStoreFile(i, isReference)); } return ret; } long[] getSizes(List sfList) { long[] aNums = new long[sfList.size()]; - for (int i = 0; i < sfList.size(); ++i) { + for (int i=0; i candidates, long... expected) - throws IOException { + + void compactEquals(List candidates, long ... expected) + throws IOException { compactEquals(candidates, false, expected); } - void compactEquals(List candidates, boolean forcemajor, + void compactEquals(List candidates, boolean forcemajor, long ... expected) throws IOException { store.forceMajor = forcemajor; - //Test Default compactions - List actual = store.compactionManager - .selectCompaction(candidates, Store.NO_PRIORITY, forcemajor).getFilesToCompact(); - assertEquals(Arrays.toString(expected), Arrays.toString(getSizes(actual))); + List actual = store.compactSelection(candidates).getFilesToCompact(); store.forceMajor = false; + assertEquals(Arrays.toString(expected), Arrays.toString(getSizes(actual))); } public void testCompactionRatio() throws IOException { - /** + /* * NOTE: these tests are specific to describe the implementation of the * current compaction algorithm. Developed to ensure that refactoring * doesn't implicitly alter this. @@ -242,15 +191,17 @@ public class TestDefaultCompactSelection extends TestCase { compactEquals(sfCreate(tooBig, tooBig, 700,700) /* empty */); // small files = don't care about ratio compactEquals(sfCreate(8,3,1), 8,3,1); - /* TODO: add sorting + unit test back in when HBASE-2856 is fixed - // sort first so you don't include huge file the tail end. + /* TODO: add sorting + unit test back in when HBASE-2856 is fixed + // sort first so you don't include huge file the tail end // happens with HFileOutputFormat bulk migration compactEquals(sfCreate(100,50,23,12,12, 500), 23, 12, 12); */ // don't exceed max file compact threshold + assertEquals(maxFiles, + store.compactSelection(sfCreate(7,6,5,4,3,2,1)).getFilesToCompact().size()); // note: file selection starts with largest to smallest. compactEquals(sfCreate(7, 6, 5, 4, 3, 2, 1), 7, 6, 5, 4, 3); - + /* MAJOR COMPACTION */ // if a major compaction has been forced, then compact everything compactEquals(sfCreate(50,25,12,12), true, 50, 25, 12, 12); @@ -260,18 +211,15 @@ public class TestDefaultCompactSelection extends TestCase { compactEquals(sfCreate(tooBig, 12,12), true, tooBig, 12, 12); // don't exceed max file compact threshold, even with major compaction store.forceMajor = true; - assertEquals(maxFiles, - manager.selectCompaction(sfCreate(7, 6, 5, 4, 3, 2, 1), Store.NO_PRIORITY, false) - .getFilesToCompact().size()); compactEquals(sfCreate(7, 6, 5, 4, 3, 2, 1), 7, 6, 5, 4, 3); store.forceMajor = false; + // if we exceed maxCompactSize, downgrade to minor // if not, it creates a 'snowball effect' when files >> maxCompactSize: // the last file in compaction is the aggregate of all previous compactions compactEquals(sfCreate(100,50,23,12,12), true, 23, 12, 12); conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 1); conf.setFloat("hbase.hregion.majorcompaction.jitter", 0); - store.updateConfiguration(); try { // trigger an aged major compaction compactEquals(sfCreate(50,25,12,12), 50, 25, 12, 12); @@ -288,12 +236,15 @@ public class TestDefaultCompactSelection extends TestCase { // reference files shouldn't obey max threshold compactEquals(sfCreate(true, tooBig, 12,12), tooBig, 12, 12); // reference files should obey max file compact to avoid OOM - compactEquals(sfCreate(true, 7, 6, 5, 4, 3, 2, 1), 7, 6, 5, 4, 3); - + assertEquals(maxFiles, + store.compactSelection(sfCreate(true, 7,6,5,4,3,2,1)).getFilesToCompact().size()); + // reference compaction + compactEquals(sfCreate(true, 7, 6, 5, 4, 3, 2, 1), 5, 4, 3, 2, 1); + // empty case compactEquals(new ArrayList() /* empty */); // empty case (because all files are too big) - compactEquals(sfCreate(tooBig, tooBig) /* empty */); + compactEquals(sfCreate(tooBig, tooBig) /* empty */); } public void testOffPeakCompactionRatio() throws IOException { @@ -307,7 +258,7 @@ public class TestDefaultCompactSelection extends TestCase { Calendar calendar = new GregorianCalendar(); int hourOfDay = calendar.get(Calendar.HOUR_OF_DAY); LOG.debug("Hour of day = " + hourOfDay); - int hourPlusOne = ((hourOfDay+1)%24); + int hourPlusOne = ((hourOfDay+1+24)%24); int hourMinusOne = ((hourOfDay-1+24)%24); int hourMinusTwo = ((hourOfDay-2+24)%24); @@ -323,16 +274,15 @@ public class TestDefaultCompactSelection extends TestCase { this.conf.setLong("hbase.offpeak.end.hour", hourPlusOne); LOG.debug("Testing compact selection with off-peak settings (" + hourMinusOne + ", " + hourPlusOne + ")"); - // update the compaction policy to include conf changes - store.setCompactionPolicy(CompactionManager.class.getName()); - compactEquals(sfCreate(999, 50, 12, 12, 1), 50, 12, 12, 1); + compactEquals(sfCreate(999,50,12,12, 1), 50, 12, 12, 1); // set peak hour outside current selection and check compact selection this.conf.setLong("hbase.offpeak.start.hour", hourMinusTwo); this.conf.setLong("hbase.offpeak.end.hour", hourMinusOne); - store.setCompactionPolicy(CompactionManager.class.getName()); LOG.debug("Testing compact selection with off-peak settings (" + hourMinusTwo + ", " + hourMinusOne + ")"); compactEquals(sfCreate(999,50,12,12, 1), 12, 12, 1); } + } + diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTierCompactSelection.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTierCompactSelection.java deleted file mode 100644 index 37ad00eb778..00000000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTierCompactSelection.java +++ /dev/null @@ -1,318 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import java.io.IOException; -import java.util.*; - -import org.junit.experimental.categories.Category; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.SmallTests; - -@Category(SmallTests.class) -public class TestTierCompactSelection extends TestDefaultCompactSelection { - private final static Log LOG = LogFactory.getLog(TestTierCompactSelection.class); - - private static final int numTiers = 4; - - private String strPrefix, strSchema, strTier; - - - @Override - public void setUp() throws Exception { - - super.setUp(); - - // setup config values necessary for store - strPrefix = "hbase.hstore.compaction."; - strSchema = "tbl." + store.getHRegion().getTableDesc().getNameAsString() - + "cf." + store.getFamily().getNameAsString() + "."; - - this.conf.setStrings(strPrefix + "CompactionPolicy", "TierBasedCompactionPolicy"); - - this.conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 0); - - // The following parameters are for default compaction - // Some of them are used as default values of tier based compaction - this.conf.setInt(strPrefix + "min", 2); - this.conf.setInt(strPrefix + "max", 10); - this.conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 0); - this.conf.setLong(strPrefix + "max.size", 10000); - this.conf.setFloat(strPrefix + "ratio", 10.0F); - - // Specifying the family parameters here - conf.setInt(strPrefix + strSchema + "NumCompactionTiers", numTiers); - conf.setLong(strPrefix + strSchema + "MinCompactSize", minSize); - conf.setLong(strPrefix + strSchema + "MaxCompactSize", maxSize); - - // Specifying parameters for the default tier - strTier = ""; - conf.setFloat(strPrefix + strSchema + strTier + "CompactionRatio", 0.1F); - conf.setInt(strPrefix + strSchema + strTier + "MinFilesToCompact", minFiles); - conf.setInt(strPrefix + strSchema + strTier + "MaxFilesToCompact", maxFiles); - - // Specifying parameters for individual tiers here - - // Don't compact in this tier (likely to be in block cache) - strTier = "Tier.0."; - conf.setFloat(strPrefix + strSchema + strTier + "CompactionRatio", 0.0F); - - // Most aggressive tier - strTier = "Tier.1."; - conf.setFloat(strPrefix + strSchema + strTier + "CompactionRatio", 2.0F); - conf.setInt(strPrefix + strSchema + strTier + "MinFilesToCompact", 2); - conf.setInt(strPrefix + strSchema + strTier + "MaxFilesToCompact", 10); - - // Medium tier - strTier = "Tier.2."; - conf.setFloat(strPrefix + strSchema + strTier + "CompactionRatio", 1.0F); - // Also include files in tier 1 here - conf.setInt(strPrefix + strSchema + strTier + "EndingIndexForTier", 1); - - // Last tier - least aggressive compaction - // has default tier settings only - // Max Time elapsed is Infinity by default - - } - - @Override - void compactEquals( - List candidates, boolean forcemajor, - long... expected - ) - throws IOException { - store.forceMajor = forcemajor; - //update the policy for now in case any change - store.setCompactionPolicy(TierCompactionManager.class.getName()); - List actual = - store.compactionManager.selectCompaction(candidates, Store.NO_PRIORITY, forcemajor).getFilesToCompact(); - store.forceMajor = false; - assertEquals(Arrays.toString(expected), Arrays.toString(getSizes(actual))); - } - - public void testAgeBasedAssignment() throws IOException { - - conf.setLong(strPrefix + strSchema + "Tier.0.MaxAgeInDisk", 10L); - conf.setLong(strPrefix + strSchema + "Tier.1.MaxAgeInDisk", 100L); - conf.setLong(strPrefix + strSchema + "Tier.2.MaxAgeInDisk", 1000L); - conf.setLong(strPrefix + strSchema + "Tier.0.MaxSize", Long.MAX_VALUE); - conf.setLong(strPrefix + strSchema + "Tier.1.MaxSize", Long.MAX_VALUE); - conf.setLong(strPrefix + strSchema + "Tier.2.MaxSize", Long.MAX_VALUE); - - //everything in first tier, don't compact! - compactEquals(sfCreate(toArrayList( - 151, 30, 13, 12, 11 ), toArrayList( // Sizes - 8, 5, 4, 2, 1 )) // ageInDisk ( = currentTime - minFlushTime) - /* empty expected */ ); // Selected sizes - - //below minSize should compact - compactEquals(sfCreate(toArrayList( - 12, 11, 8, 3, 1 ), toArrayList( - 8, 5, 4, 2, 1 )), - 8, 3, 1 ); - - //everything in second tier - compactEquals(sfCreate(toArrayList( - 251, 70, 13, 12, 11 ), toArrayList( - 80, 50, 40, 20, 11 )), - 70, 13, 12, 11 ); - - //everything in third tier - compactEquals(sfCreate(toArrayList( - 251, 70, 13, 12, 11 ), toArrayList( - 800, 500, 400, 200, 110 )), - 13, 12, 11 ); - - //everything in fourth tier - compactEquals(sfCreate(toArrayList( - 251, 70, 13, 12, 11 ), toArrayList( - 8000, 5000, 4000, 2000, 1100 )) - /* empty expected */ ); - - //Valid compaction in 4th tier with ratio 0.10, hits maxFilesToCompact - compactEquals(sfCreate(toArrayList( - 500, 93, 92, 91, 90, 89, 88, 87, 86, 85, 84, 83, 82, 81, 80 ), toArrayList( - 5094, 5093, 5092, 5091, 5090, 5089, 5088, 5087, 5086, 5085, 5084, 5083, 5082, 5081, 5080)), - 93, 92, 91, 90, 89 ); - - //Now mixing tiers 1,0, expected selection in tier 1 only - compactEquals(sfCreate(toArrayList( - 999, 110, 100, 12, 11 ), toArrayList( - 90, 80, 50, 4, 1 )), - 110, 100 ); - - //Mixing tier 2,1, expected selection in tier 2 including tier 1 but not zero - compactEquals(sfCreate(toArrayList( - 999, 110, 100, 12, 11 ), toArrayList( - 900, 800, 500, 40, 1 )), - 110, 100, 12 ); - - //Mixing tier 2,1, expected selection in tier 1 because of recentFirstOrder = true - compactEquals(sfCreate(toArrayList( - 999, 110, 100, 12, 13, 11 ), toArrayList( - 900, 800, 500, 40, 30, 1 )), - 12, 13 ); - - conf.setBoolean(strPrefix + strSchema + "IsRecentFirstOrder", false); - - //Mixing tier 2,1, expected selection in tier 1 because of recentFirstOrder = false - compactEquals(sfCreate(toArrayList( - 999, 110, 100, 12, 13, 11 ), toArrayList( - 900, 800, 500, 40, 30, 1 )), - 110, 100, 12, 13 ); - - //Mixing all tier 3,2,1,0 expected selection in tier 1 only - compactEquals(sfCreate(toArrayList( - 999, 800, 110, 100, 12, 13, 11 ), toArrayList( - 9000, 800, 50, 40, 8, 3, 1 )), - 110, 100 ); - - //Checking backward compatibility, first 3 files don't have minFlushTime, - //all should go to tier 1, not tier 0 - compactEquals(sfCreate(toArrayList( - 999, 800, 110, 100, 12, 13, 11 ), toArrayList( - 0, 0, 0, 40, 8, 3, 1 )), - 999, 800, 110, 100 ); - - //make sure too big files don't get compacted - compactEquals(sfCreate(toArrayList( - 1002, 1001, 999, 800, 700, 12, 13, 11 ), toArrayList( - 900, 80, 50, 40, 30, 20, 4, 2 )), - 999, 800, 700, 12 ); - - } - - public void testSizeBasedAssignment() throws IOException { - - conf.setLong(strPrefix + strSchema + "MinCompactSize", 3); - - conf.setLong(strPrefix + strSchema + "Tier.0.MaxSize", 10L); - conf.setLong(strPrefix + strSchema + "Tier.1.MaxSize", 100L); - conf.setLong(strPrefix + strSchema + "Tier.2.MaxSize", 1000L); - conf.setLong(strPrefix + strSchema + "Tier.0.MaxAgeInDisk", Long.MAX_VALUE); - conf.setLong(strPrefix + strSchema + "Tier.1.MaxAgeInDisk", Long.MAX_VALUE); - conf.setLong(strPrefix + strSchema + "Tier.2.MaxAgeInDisk", Long.MAX_VALUE); - - compactEquals(sfCreate(false, - 500, 3, 2, 1 ), - 3, 2, 1 ); - - compactEquals(sfCreate(false, - 500, 8, 7, 6, 5, 4, 2, 1 ) - /* empty */ ); - - compactEquals(sfCreate(false, - 500, 6, 8, 4, 7, 4, 2, 1 ) - /* empty */ ); - - compactEquals(sfCreate(false, - 500, 23, 11, 8, 4, 1 ) - /* empty */ ); - - compactEquals(sfCreate(false, - 500, 11, 23, 8, 4, 1 ), - 11, 23 ); - - compactEquals(sfCreate(false, - 500, 9, 23, 8, 4, 1 ), - 9, 23 ); - - compactEquals(sfCreate(false, - 500, 70, 23, 11, 8, 4, 1 ) - /* empty */ ); - - compactEquals(sfCreate(false, - 500, 60, 23, 11, 8, 4, 1 ), - 60, 23, 11 ); - - compactEquals(sfCreate(false, - 500, 90, 60, 23, 11, 8, 4, 1 ), - 90, 60, 23, 11 ); - - conf.setBoolean(strPrefix + strSchema + "IsRecentFirstOrder", false); - - compactEquals(sfCreate(false, - 500, 450, 60, 23, 11, 8, 4, 1 ), - 500, 450, 60, 23, 11 ); - - compactEquals(sfCreate(false, - 450, 500, 60, 23, 11, 8, 4, 1 ), - 450, 500, 60, 23, 11 ); - - compactEquals(sfCreate(false, - 1013, 1012, 1011, 1010, 1009, 1008, 1007, 1006, 1005, 1004, 1003, 1002, 1001, 999, 450, 550 ), - 999, 450, 550 ); - - conf.setLong(strPrefix + strSchema + "MaxCompactSize", 10000); - - compactEquals(sfCreate(false, - 1013, 1012, 1011, 1010, 1009, 1008, 1007, 1006, 1005, 1004, 1003, 1002, 1001, 999, 450, 550 ), - 1013, 1012, 1011, 1010, 1009 ); - - compactEquals(sfCreate(false, - 1013, 992, 1011, 1010, 1009, 1008, 1007, 1006, 1005, 1004, 1003, 1002, 1001, 999, 450, 550), - 1013, 992, 1011, 1010, 1009 ); - - compactEquals(sfCreate(false, - 992, 993, 1011, 990, 1009, 998, 1007, 996, 1005, 994, 1003, 992, 1001, 999, 450, 550 ), - 992, 993, 1011, 990, 1009 ); - - conf.setBoolean(strPrefix + strSchema + "IsRecentFirstOrder", true); - - compactEquals(sfCreate(false, - 500, 450, 60, 23, 11, 8, 4, 1 ), - 60, 23, 11 ); - - compactEquals(sfCreate(false, - 450, 500, 60, 23, 11, 8, 4, 1 ), - 60, 23, 11 ); - - compactEquals(sfCreate(false, - 1013, 1012, 1011, 1010, 1009, 1008, 1007, 1006, 1005, 1004, 1003, 1002, 1001, 999, 450, 550 ), - 999, 450, 550 ); - - compactEquals(sfCreate(false, - 992, 993, 1011, 990, 1009, 998, 1007, 996, 1005, 994, 1003, 992, 1001, 999, 450, 550 ), - 999, 450, 550 ); - - compactEquals(sfCreate(false, - 992, 993, 1011, 990, 1009, 998, 1007, 996, 1005, 994, 1003, 992, 991, 999, 450, 550 ), - 992, 991, 999, 450, 550 ); - - compactEquals(sfCreate(false, - 992, 993, 1011, 990, 1009, 998, 1007, 996, 1005, 994, 1003, 992, 991, 999, 450, 550, 1001), - 992, 993, 1011, 990, 1009 ); - - } - - @Override - public void testCompactionRatio() throws IOException { - conf.setInt(strPrefix + strSchema + "NumCompactionTiers", 1); - conf.setFloat(strPrefix + strSchema + "Tier.0.CompactionRatio", 1.0F); - conf.setInt(strPrefix + "max", 5); - super.testCompactionRatio(); - } - - @Override - public void testOffPeakCompactionRatio() throws IOException {} - -}