HBASE-7055 port HBASE-6371 tier-based compaction from 0.89-fb to trunk (Sergey)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1403852 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
9fda46c126
commit
028a672e20
|
@ -98,6 +98,7 @@ public class HBaseConfiguration extends Configuration {
|
|||
public static Configuration addHbaseResources(Configuration conf) {
|
||||
conf.addResource("hbase-default.xml");
|
||||
conf.addResource("hbase-site.xml");
|
||||
conf.addResource("hbase-compactions.xml");
|
||||
|
||||
checkDefaultsVersion(conf);
|
||||
checkForClusterFreeMemoryLimit(conf);
|
||||
|
|
|
@ -223,6 +223,8 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
|
|||
Bytes.toBytes(compactionExclude));
|
||||
w.appendFileInfo(StoreFile.TIMERANGE_KEY,
|
||||
WritableUtils.toByteArray(trt));
|
||||
w.appendFileInfo(StoreFile.MIN_FLUSH_TIME,
|
||||
Bytes.toBytes(StoreFile.NO_MIN_FLUSH_TIME));
|
||||
w.close();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,203 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
|
||||
/**
|
||||
* Control knobs for default compaction algorithm :
|
||||
* <p/>
|
||||
* maxCompactSize - upper bound on file size to be included in minor compactions
|
||||
* minCompactSize - lower bound below which compaction is selected without ratio test
|
||||
* minFilesToCompact - lower bound on number of files in any minor compaction
|
||||
* maxFilesToCompact - upper bound on number of files in any minor compaction
|
||||
* compactionRatio - Ratio used for compaction
|
||||
* <p/>
|
||||
* Set parameter as "hbase.hstore.compaction.<attribute>"
|
||||
*/
|
||||
|
||||
//TODO: revisit this class for online parameter updating
|
||||
|
||||
public class CompactionConfiguration {
|
||||
|
||||
static final Log LOG = LogFactory.getLog(CompactionConfiguration.class);
|
||||
|
||||
Configuration conf;
|
||||
Store store;
|
||||
|
||||
long maxCompactSize;
|
||||
long minCompactSize;
|
||||
int minFilesToCompact;
|
||||
int maxFilesToCompact;
|
||||
double compactionRatio;
|
||||
double offPeekCompactionRatio;
|
||||
int offPeakStartHour;
|
||||
int offPeakEndHour;
|
||||
long throttlePoint;
|
||||
boolean shouldDeleteExpired;
|
||||
long majorCompactionPeriod;
|
||||
float majorCompactionJitter;
|
||||
|
||||
CompactionConfiguration(Configuration conf, Store store) {
|
||||
this.conf = conf;
|
||||
this.store = store;
|
||||
|
||||
String strPrefix = "hbase.hstore.compaction.";
|
||||
|
||||
maxCompactSize = conf.getLong(strPrefix + "max.size", Long.MAX_VALUE);
|
||||
minCompactSize = conf.getLong(strPrefix + "min.size", store.getHRegion().memstoreFlushSize);
|
||||
minFilesToCompact = Math.max(2, conf.getInt(strPrefix + "min",
|
||||
/*old name*/ conf.getInt("hbase.hstore.compactionThreshold", 3)));
|
||||
maxFilesToCompact = conf.getInt(strPrefix + "max", 10);
|
||||
compactionRatio = conf.getFloat(strPrefix + "ratio", 1.2F);
|
||||
offPeekCompactionRatio = conf.getFloat(strPrefix + "ratio.offpeak", 5.0F);
|
||||
offPeakStartHour = conf.getInt("hbase.offpeak.start.hour", -1);
|
||||
offPeakEndHour = conf.getInt("hbase.offpeak.end.hour", -1);
|
||||
|
||||
if (!isValidHour(offPeakStartHour) || !isValidHour(offPeakEndHour)) {
|
||||
if (!(offPeakStartHour == -1 && offPeakEndHour == -1)) {
|
||||
LOG.warn("Ignoring invalid start/end hour for peak hour : start = " +
|
||||
this.offPeakStartHour + " end = " + this.offPeakEndHour +
|
||||
". Valid numbers are [0-23]");
|
||||
}
|
||||
this.offPeakStartHour = this.offPeakEndHour = -1;
|
||||
}
|
||||
|
||||
throttlePoint = conf.getLong("hbase.regionserver.thread.compaction.throttle",
|
||||
2 * maxFilesToCompact * store.getHRegion().memstoreFlushSize);
|
||||
shouldDeleteExpired = conf.getBoolean("hbase.store.delete.expired.storefile", true);
|
||||
majorCompactionPeriod = conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24);
|
||||
majorCompactionJitter = conf.getFloat("hbase.hregion.majorcompaction.jitter", 0.20F);
|
||||
|
||||
LOG.info("Compaction configuration " + this.toString());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.format(
|
||||
"size [%d, %d); files [%d, %d); ratio %f; off-peak ratio %f; off-peak hours %d-%d; " +
|
||||
"throttle point %d;%s delete expired; major period %d, major jitter %f",
|
||||
minCompactSize,
|
||||
maxCompactSize,
|
||||
minFilesToCompact,
|
||||
maxFilesToCompact,
|
||||
compactionRatio,
|
||||
offPeekCompactionRatio,
|
||||
offPeakStartHour,
|
||||
offPeakEndHour,
|
||||
throttlePoint,
|
||||
shouldDeleteExpired ? "" : " don't",
|
||||
majorCompactionPeriod,
|
||||
majorCompactionJitter);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return lower bound below which compaction is selected without ratio test
|
||||
*/
|
||||
long getMinCompactSize() {
|
||||
return minCompactSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return upper bound on file size to be included in minor compactions
|
||||
*/
|
||||
long getMaxCompactSize() {
|
||||
return maxCompactSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return upper bound on number of files to be included in minor compactions
|
||||
*/
|
||||
int getMinFilesToCompact() {
|
||||
return minFilesToCompact;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return upper bound on number of files to be included in minor compactions
|
||||
*/
|
||||
int getMaxFilesToCompact() {
|
||||
return maxFilesToCompact;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Ratio used for compaction
|
||||
*/
|
||||
double getCompactionRatio() {
|
||||
return compactionRatio;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Off peak Ratio used for compaction
|
||||
*/
|
||||
double getCompactionRatioOffPeak() {
|
||||
return offPeekCompactionRatio;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Hour at which off-peak compactions start
|
||||
*/
|
||||
int getOffPeakStartHour() {
|
||||
return offPeakStartHour;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Hour at which off-peak compactions end
|
||||
*/
|
||||
int getOffPeakEndHour() {
|
||||
return offPeakEndHour;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return ThrottlePoint used for classifying small and large compactions
|
||||
*/
|
||||
long getThrottlePoint() {
|
||||
return throttlePoint;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Major compaction period from compaction.
|
||||
* Major compactions are selected periodically according to this parameter plus jitter
|
||||
*/
|
||||
long getMajorCompactionPeriod() {
|
||||
return majorCompactionPeriod;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Major the jitter fraction, the fraction within which the major compaction period is
|
||||
* randomly chosen from the majorCompactionPeriod in each store.
|
||||
*/
|
||||
float getMajorCompactionJitter() {
|
||||
return majorCompactionJitter;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Whether expired files should be deleted ASAP using compactions
|
||||
*/
|
||||
boolean shouldDeleteExpired() {
|
||||
return shouldDeleteExpired;
|
||||
}
|
||||
|
||||
private static boolean isValidHour(int hour) {
|
||||
return (hour >= 0 && hour <= 23);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,411 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.collect.Collections2;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Calendar;
|
||||
import java.util.GregorianCalendar;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public class CompactionManager {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(CompactionManager.class);
|
||||
private final static Calendar calendar = new GregorianCalendar();
|
||||
|
||||
private Store store;
|
||||
CompactionConfiguration comConf;
|
||||
|
||||
CompactionManager(Configuration configuration, Store store) {
|
||||
this.store = store;
|
||||
comConf = new CompactionConfiguration(configuration, store);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param candidateFiles candidate files, ordered from oldest to newest
|
||||
* @return subset copy of candidate list that meets compaction criteria
|
||||
* @throws java.io.IOException
|
||||
*/
|
||||
CompactSelection selectCompaction(List<StoreFile> candidateFiles, int priority, boolean forceMajor)
|
||||
throws IOException {
|
||||
// Prelimanry compaction subject to filters
|
||||
CompactSelection candidateSelection = new CompactSelection(candidateFiles);
|
||||
|
||||
if (!forceMajor) {
|
||||
// If there are expired files, only select them so that compaction deletes them
|
||||
if (comConf.shouldDeleteExpired() && (store.getTtl() != Long.MAX_VALUE)) {
|
||||
CompactSelection expiredSelection = selectExpiredSFs(
|
||||
candidateSelection, EnvironmentEdgeManager.currentTimeMillis() - store.getTtl());
|
||||
if (expiredSelection != null) {
|
||||
return expiredSelection;
|
||||
}
|
||||
}
|
||||
candidateSelection = skipLargeFiles(candidateSelection);
|
||||
}
|
||||
|
||||
// Force a major compaction if this is a user-requested major compaction,
|
||||
// or if we do not have too many files to compact and this was requested
|
||||
// as a major compaction.
|
||||
// Or, if there are any references among the candidates.
|
||||
boolean isUserCompaction = (priority == Store.PRIORITY_USER);
|
||||
boolean majorCompaction = (
|
||||
(forceMajor && isUserCompaction)
|
||||
|| ((forceMajor || isMajorCompaction(candidateSelection.getFilesToCompact()))
|
||||
&& (candidateSelection.getFilesToCompact().size() < comConf.getMaxFilesToCompact()))
|
||||
|| store.hasReferences(candidateSelection.getFilesToCompact())
|
||||
);
|
||||
|
||||
LOG.debug(store.getHRegion().regionInfo.getEncodedName() + " - " +
|
||||
store.getColumnFamilyName() + ": Initiating " +
|
||||
(majorCompaction ? "major" : "minor") + "compaction");
|
||||
|
||||
if (!majorCompaction) {
|
||||
// we're doing a minor compaction, let's see what files are applicable
|
||||
candidateSelection = filterBulk(candidateSelection);
|
||||
candidateSelection = applyCompactionPolicy(candidateSelection);
|
||||
candidateSelection = checkMinFilesCriteria(candidateSelection);
|
||||
}
|
||||
candidateSelection = removeExcessFiles(candidateSelection, isUserCompaction, majorCompaction);
|
||||
return candidateSelection;
|
||||
}
|
||||
|
||||
/**
|
||||
* Select the expired store files to compact
|
||||
*
|
||||
* @param candidates the initial set of storeFiles
|
||||
* @param maxExpiredTimeStamp
|
||||
* The store file will be marked as expired if its max time stamp is
|
||||
* less than this maxExpiredTimeStamp.
|
||||
* @return A CompactSelection contains the expired store files as
|
||||
* filesToCompact
|
||||
*/
|
||||
private CompactSelection selectExpiredSFs
|
||||
(CompactSelection candidates, long maxExpiredTimeStamp) {
|
||||
List<StoreFile> filesToCompact = candidates.getFilesToCompact();
|
||||
if (filesToCompact == null || filesToCompact.size() == 0)
|
||||
return null;
|
||||
ArrayList<StoreFile> expiredStoreFiles = null;
|
||||
boolean hasExpiredStoreFiles = false;
|
||||
CompactSelection expiredSFSelection = null;
|
||||
|
||||
for (StoreFile storeFile : filesToCompact) {
|
||||
if (storeFile.getReader().getMaxTimestamp() < maxExpiredTimeStamp) {
|
||||
LOG.info("Deleting the expired store file by compaction: "
|
||||
+ storeFile.getPath() + " whose maxTimeStamp is "
|
||||
+ storeFile.getReader().getMaxTimestamp()
|
||||
+ " while the max expired timestamp is " + maxExpiredTimeStamp);
|
||||
if (!hasExpiredStoreFiles) {
|
||||
expiredStoreFiles = new ArrayList<StoreFile>();
|
||||
hasExpiredStoreFiles = true;
|
||||
}
|
||||
expiredStoreFiles.add(storeFile);
|
||||
}
|
||||
}
|
||||
|
||||
if (hasExpiredStoreFiles) {
|
||||
expiredSFSelection = new CompactSelection(expiredStoreFiles);
|
||||
}
|
||||
return expiredSFSelection;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param candidates pre-filtrate
|
||||
* @return filtered subset
|
||||
* exclude all files above maxCompactSize
|
||||
* Also save all references. We MUST compact them
|
||||
*/
|
||||
private CompactSelection skipLargeFiles(CompactSelection candidates) {
|
||||
int pos = 0;
|
||||
while (pos < candidates.getFilesToCompact().size() &&
|
||||
candidates.getFilesToCompact().get(pos).getReader().length() >
|
||||
comConf.getMaxCompactSize() &&
|
||||
!candidates.getFilesToCompact().get(pos).isReference()) {
|
||||
++pos;
|
||||
}
|
||||
if (pos > 0) {
|
||||
LOG.debug("Some files are too large. Excluding " + pos + " files from compaction candidates");
|
||||
candidates.clearSubList(0, pos);
|
||||
}
|
||||
return candidates;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param candidates pre-filtrate
|
||||
* @return filtered subset
|
||||
* exclude all bulk load files if configured
|
||||
*/
|
||||
private CompactSelection filterBulk(CompactSelection candidates) {
|
||||
candidates.getFilesToCompact().removeAll(Collections2.filter(
|
||||
candidates.getFilesToCompact(),
|
||||
new Predicate<StoreFile>() {
|
||||
@Override
|
||||
public boolean apply(StoreFile input) {
|
||||
return input.excludeFromMinorCompaction();
|
||||
}
|
||||
}));
|
||||
return candidates;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param candidates pre-filtrate
|
||||
* @return filtered subset
|
||||
* take upto maxFilesToCompact from the start
|
||||
*/
|
||||
private CompactSelection removeExcessFiles(CompactSelection candidates,
|
||||
boolean isUserCompaction, boolean isMajorCompaction) {
|
||||
int excess = candidates.getFilesToCompact().size() - comConf.getMaxFilesToCompact();
|
||||
if (excess > 0) {
|
||||
if (isMajorCompaction && isUserCompaction) {
|
||||
LOG.debug("Warning, compacting more than " + comConf.getMaxFilesToCompact() +
|
||||
" files because of a user-requested major compaction");
|
||||
} else {
|
||||
LOG.debug("Too many admissible files. Excluding " + excess
|
||||
+ " files from compaction candidates");
|
||||
candidates.clearSubList(comConf.getMaxFilesToCompact(),
|
||||
candidates.getFilesToCompact().size());
|
||||
}
|
||||
}
|
||||
return candidates;
|
||||
}
|
||||
/**
|
||||
* @param candidates pre-filtrate
|
||||
* @return filtered subset
|
||||
* forget the compactionSelection if we don't have enough files
|
||||
*/
|
||||
private CompactSelection checkMinFilesCriteria(CompactSelection candidates) {
|
||||
int minFiles = comConf.getMinFilesToCompact();
|
||||
if (candidates.getFilesToCompact().size() < minFiles) {
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("Not compacting files because we only have " +
|
||||
candidates.getFilesToCompact().size() +
|
||||
" files ready for compaction. Need " + minFiles + " to initiate.");
|
||||
}
|
||||
candidates.emptyFileList();
|
||||
}
|
||||
return candidates;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param candidates pre-filtrate
|
||||
* @return filtered subset
|
||||
* -- Default minor compaction selection algorithm: Choose CompactSelection from candidates --
|
||||
* First exclude bulk-load files if indicated in configuration.
|
||||
* Start at the oldest file and stop when you find the first file that
|
||||
* meets compaction criteria:
|
||||
* (1) a recently-flushed, small file (i.e. <= minCompactSize)
|
||||
* OR
|
||||
* (2) within the compactRatio of sum(newer_files)
|
||||
* Given normal skew, any newer files will also meet this criteria
|
||||
* <p/>
|
||||
* Additional Note:
|
||||
* If fileSizes.size() >> maxFilesToCompact, we will recurse on
|
||||
* compact(). Consider the oldest files first to avoid a
|
||||
* situation where we always compact [end-threshold,end). Then, the
|
||||
* last file becomes an aggregate of the previous compactions.
|
||||
*
|
||||
* normal skew:
|
||||
*
|
||||
* older ----> newer (increasing seqID)
|
||||
* _
|
||||
* | | _
|
||||
* | | | | _
|
||||
* --|-|- |-|- |-|---_-------_------- minCompactSize
|
||||
* | | | | | | | | _ | |
|
||||
* | | | | | | | | | | | |
|
||||
* | | | | | | | | | | | |
|
||||
*/
|
||||
CompactSelection applyCompactionPolicy(CompactSelection candidates) throws IOException {
|
||||
if (candidates.getFilesToCompact().isEmpty()) {
|
||||
return candidates;
|
||||
}
|
||||
|
||||
// we're doing a minor compaction, let's see what files are applicable
|
||||
int start = 0;
|
||||
double ratio = comConf.getCompactionRatio();
|
||||
if (isOffPeakHour() && candidates.trySetOffpeak()) {
|
||||
ratio = comConf.getCompactionRatioOffPeak();
|
||||
LOG.info("Running an off-peak compaction, selection ratio = " + ratio
|
||||
+ ", numOutstandingOffPeakCompactions is now "
|
||||
+ CompactSelection.getNumOutStandingOffPeakCompactions());
|
||||
}
|
||||
|
||||
// get store file sizes for incremental compacting selection.
|
||||
int countOfFiles = candidates.getFilesToCompact().size();
|
||||
long[] fileSizes = new long[countOfFiles];
|
||||
long[] sumSize = new long[countOfFiles];
|
||||
for (int i = countOfFiles - 1; i >= 0; --i) {
|
||||
StoreFile file = candidates.getFilesToCompact().get(i);
|
||||
fileSizes[i] = file.getReader().length();
|
||||
// calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
|
||||
int tooFar = i + comConf.getMaxFilesToCompact() - 1;
|
||||
sumSize[i] = fileSizes[i]
|
||||
+ ((i + 1 < countOfFiles) ? sumSize[i + 1] : 0)
|
||||
- ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
|
||||
}
|
||||
|
||||
|
||||
while (countOfFiles - start >= comConf.getMinFilesToCompact() &&
|
||||
fileSizes[start] > Math.max(comConf.getMinCompactSize(), (long) (sumSize[start + 1] * ratio))) {
|
||||
++start;
|
||||
}
|
||||
if (start < countOfFiles) {
|
||||
LOG.info("Default compaction algorithm has selected " + (countOfFiles - start)
|
||||
+ " files from " + countOfFiles + " candidates");
|
||||
}
|
||||
|
||||
candidates = candidates.getSubList(start, countOfFiles);
|
||||
|
||||
return candidates;
|
||||
}
|
||||
|
||||
/*
|
||||
* @param filesToCompact Files to compact. Can be null.
|
||||
* @return True if we should run a major compaction.
|
||||
*/
|
||||
boolean isMajorCompaction(final List<StoreFile> filesToCompact) throws IOException {
|
||||
boolean result = false;
|
||||
long mcTime = getNextMajorCompactTime();
|
||||
if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) {
|
||||
return result;
|
||||
}
|
||||
// TODO: Use better method for determining stamp of last major (HBASE-2990)
|
||||
long lowTimestamp = getLowestTimestamp(filesToCompact);
|
||||
long now = System.currentTimeMillis();
|
||||
if (lowTimestamp > 0l && lowTimestamp < (now - mcTime)) {
|
||||
// Major compaction time has elapsed.
|
||||
if (filesToCompact.size() == 1) {
|
||||
// Single file
|
||||
StoreFile sf = filesToCompact.get(0);
|
||||
long oldest =
|
||||
(sf.getReader().timeRangeTracker == null) ?
|
||||
Long.MIN_VALUE :
|
||||
now - sf.getReader().timeRangeTracker.minimumTimestamp;
|
||||
if (sf.isMajorCompaction() &&
|
||||
(store.getTtl() == HConstants.FOREVER || oldest < store.getTtl())) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Skipping major compaction of " + this +
|
||||
" because one (major) compacted file only and oldestTime " +
|
||||
oldest + "ms is < ttl=" + store.getTtl());
|
||||
}
|
||||
} else if (store.getTtl() != HConstants.FOREVER && oldest > store.getTtl()) {
|
||||
LOG.debug("Major compaction triggered on store " + this +
|
||||
", because keyvalues outdated; time since last major compaction " +
|
||||
(now - lowTimestamp) + "ms");
|
||||
result = true;
|
||||
}
|
||||
} else {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Major compaction triggered on store " + this +
|
||||
"; time since last major compaction " + (now - lowTimestamp) + "ms");
|
||||
}
|
||||
result = true;
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
long getNextMajorCompactTime() {
|
||||
// default = 24hrs
|
||||
long ret = comConf.getMajorCompactionPeriod();
|
||||
String strCompactionTime = store.getFamily().getValue(HConstants.MAJOR_COMPACTION_PERIOD);
|
||||
if (strCompactionTime != null) {
|
||||
ret = (new Long(strCompactionTime)).longValue();
|
||||
}
|
||||
|
||||
if (ret > 0) {
|
||||
// default = 20% = +/- 4.8 hrs
|
||||
double jitterPct = comConf.getMajorCompactionJitter();
|
||||
if (jitterPct > 0) {
|
||||
long jitter = Math.round(ret * jitterPct);
|
||||
// deterministic jitter avoids a major compaction storm on restart
|
||||
Integer seed = store.getDeterministicRandomSeed();
|
||||
if (seed != null) {
|
||||
double rnd = (new Random(seed)).nextDouble();
|
||||
ret += jitter - Math.round(2L * jitter * rnd);
|
||||
} else {
|
||||
ret = 0; // no storefiles == no major compaction
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
/*
|
||||
* Gets lowest timestamp from candidate StoreFiles
|
||||
*
|
||||
* @param fs
|
||||
* @param dir
|
||||
* @throws IOException
|
||||
*/
|
||||
static long getLowestTimestamp(final List<StoreFile> candidates)
|
||||
throws IOException {
|
||||
long minTs = Long.MAX_VALUE;
|
||||
for (StoreFile storeFile : candidates) {
|
||||
minTs = Math.min(minTs, storeFile.getModificationTimeStamp());
|
||||
}
|
||||
return minTs;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param compactionSize Total size of some compaction
|
||||
* @return whether this should be a large or small compaction
|
||||
*/
|
||||
boolean throttleCompaction(long compactionSize) {
|
||||
return compactionSize > comConf.getThrottlePoint();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param numCandidates Number of candidate store files
|
||||
* @return whether a compactionSelection is possible
|
||||
*/
|
||||
boolean needsCompaction(int numCandidates) {
|
||||
return numCandidates > comConf.getMinFilesToCompact();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return whether this is off-peak hour
|
||||
*/
|
||||
private boolean isOffPeakHour() {
|
||||
int currentHour = calendar.get(Calendar.HOUR_OF_DAY);
|
||||
int startHour = comConf.getOffPeakStartHour();
|
||||
int endHour = comConf.getOffPeakEndHour();
|
||||
// If offpeak time checking is disabled just return false.
|
||||
if (startHour == endHour) {
|
||||
return false;
|
||||
}
|
||||
if (startHour < endHour) {
|
||||
return (currentHour >= startHour && currentHour < endHour);
|
||||
}
|
||||
return (currentHour >= startHour || currentHour < endHour);
|
||||
}
|
||||
}
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.client.Scan;
|
|||
import org.apache.hadoop.hbase.io.hfile.Compression;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
/**
|
||||
|
@ -64,11 +65,15 @@ class Compactor extends Configured {
|
|||
final Collection<StoreFile> filesToCompact,
|
||||
final boolean majorCompaction, final long maxId)
|
||||
throws IOException {
|
||||
// Calculate maximum key count after compaction (for blooms)
|
||||
// Calculate maximum key count after compaction (for blooms), and minFlushTime after compaction
|
||||
// Also calculate earliest put timestamp if major compaction
|
||||
int maxKeyCount = 0;
|
||||
long minFlushTime = Long.MAX_VALUE;
|
||||
long earliestPutTs = HConstants.LATEST_TIMESTAMP;
|
||||
for (StoreFile file: filesToCompact) {
|
||||
if (file.hasMinFlushTime() && file.getMinFlushTime() < minFlushTime) {
|
||||
minFlushTime = file.getMinFlushTime();
|
||||
}
|
||||
StoreFile.Reader r = file.getReader();
|
||||
if (r == null) {
|
||||
LOG.warn("Null reader for " + file.getPath());
|
||||
|
@ -194,6 +199,10 @@ class Compactor extends Configured {
|
|||
}
|
||||
} finally {
|
||||
if (writer != null) {
|
||||
if (minFlushTime == Long.MAX_VALUE) {
|
||||
minFlushTime = StoreFile.NO_MIN_FLUSH_TIME;
|
||||
}
|
||||
writer.appendFileInfo(StoreFile.MIN_FLUSH_TIME, Bytes.toBytes(minFlushTime));
|
||||
writer.appendMetadata(maxId, majorCompaction);
|
||||
writer.close();
|
||||
}
|
||||
|
|
|
@ -4176,7 +4176,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
}
|
||||
|
||||
/**
|
||||
* @return True if needs a mojor compaction.
|
||||
* @return True if needs a major compaction.
|
||||
* @throws IOException
|
||||
*/
|
||||
boolean isMajorCompaction() throws IOException {
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
|
@ -63,9 +64,10 @@ import org.apache.hadoop.hbase.io.hfile.HFileScanner;
|
|||
import org.apache.hadoop.hbase.io.hfile.InvalidHFileException;
|
||||
import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
|
||||
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection;
|
||||
import org.apache.hadoop.hbase.regionserver.CompactionManager;
|
||||
import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured;
|
||||
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -77,8 +79,6 @@ import org.apache.hadoop.hbase.util.FSUtils;
|
|||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.collect.Collections2;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
|
@ -108,21 +108,24 @@ import com.google.common.collect.Lists;
|
|||
@InterfaceAudience.Private
|
||||
public class HStore extends SchemaConfigured implements Store {
|
||||
static final Log LOG = LogFactory.getLog(HStore.class);
|
||||
|
||||
/** Parameter name for what compaction manager to use. */
|
||||
private static final String COMPACTION_MANAGER_CLASS = "hbase.compactionmanager.class";
|
||||
|
||||
/** Default compaction manager class name. */
|
||||
private static final String DEFAULT_COMPACTION_MANAGER_CLASS = CompactionManager.class.getName();
|
||||
|
||||
protected final MemStore memstore;
|
||||
// This stores directory in the filesystem.
|
||||
private final Path homedir;
|
||||
private final HRegion region;
|
||||
private final HColumnDescriptor family;
|
||||
CompactionManager compactionManager;
|
||||
final FileSystem fs;
|
||||
final Configuration conf;
|
||||
final CacheConfig cacheConf;
|
||||
// ttl in milliseconds.
|
||||
// ttl in milliseconds. TODO: can this be removed? Already stored in scanInfo.
|
||||
private long ttl;
|
||||
private final int minFilesToCompact;
|
||||
private final int maxFilesToCompact;
|
||||
private final long minCompactSize;
|
||||
private final long maxCompactSize;
|
||||
private long lastCompactSize = 0;
|
||||
volatile boolean forceMajor = false;
|
||||
/* how many bytes to write between status checks */
|
||||
|
@ -197,7 +200,7 @@ public class HStore extends SchemaConfigured implements Store {
|
|||
|
||||
this.comparator = info.getComparator();
|
||||
// Get TTL
|
||||
this.ttl = getTTL(family);
|
||||
this.ttl = determineTTLFromFamily(family);
|
||||
// used by ScanQueryMatcher
|
||||
long timeToPurgeDeletes =
|
||||
Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
|
||||
|
@ -208,23 +211,11 @@ public class HStore extends SchemaConfigured implements Store {
|
|||
scanInfo = new ScanInfo(family, ttl, timeToPurgeDeletes, this.comparator);
|
||||
this.memstore = new MemStore(conf, this.comparator);
|
||||
|
||||
// By default, compact if storefile.count >= minFilesToCompact
|
||||
this.minFilesToCompact = Math.max(2,
|
||||
conf.getInt("hbase.hstore.compaction.min",
|
||||
/*old name*/ conf.getInt("hbase.hstore.compactionThreshold", 3)));
|
||||
LOG.info("hbase.hstore.compaction.min = " + this.minFilesToCompact);
|
||||
|
||||
// Setting up cache configuration for this family
|
||||
this.cacheConf = new CacheConfig(conf, family);
|
||||
this.blockingStoreFileCount =
|
||||
conf.getInt("hbase.hstore.blockingStoreFiles", 7);
|
||||
|
||||
this.maxFilesToCompact = conf.getInt("hbase.hstore.compaction.max", 10);
|
||||
this.minCompactSize = conf.getLong("hbase.hstore.compaction.min.size",
|
||||
this.region.memstoreFlushSize);
|
||||
this.maxCompactSize
|
||||
= conf.getLong("hbase.hstore.compaction.max.size", Long.MAX_VALUE);
|
||||
|
||||
this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false);
|
||||
|
||||
if (HStore.closeCheckInterval == 0) {
|
||||
|
@ -239,13 +230,53 @@ public class HStore extends SchemaConfigured implements Store {
|
|||
this.bytesPerChecksum = getBytesPerChecksum(conf);
|
||||
// Create a compaction tool instance
|
||||
this.compactor = new Compactor(this.conf);
|
||||
|
||||
setCompactionPolicy(conf.get(COMPACTION_MANAGER_CLASS, DEFAULT_COMPACTION_MANAGER_CLASS));
|
||||
}
|
||||
|
||||
/**
|
||||
* This setter is used for unit testing
|
||||
* TODO: Fix this for online configuration updating
|
||||
*/
|
||||
void setCompactionPolicy(String managerClassName) {
|
||||
try {
|
||||
Class<? extends CompactionManager> managerClass =
|
||||
(Class<? extends CompactionManager>) Class.forName(managerClassName);
|
||||
compactionManager = managerClass.getDeclaredConstructor(
|
||||
new Class[] {Configuration.class, Store.class } ).newInstance(
|
||||
new Object[] { conf, this } );
|
||||
} catch (ClassNotFoundException e) {
|
||||
throw new UnsupportedOperationException(
|
||||
"Unable to find region server interface " + managerClassName, e);
|
||||
} catch (IllegalAccessException e) {
|
||||
throw new UnsupportedOperationException(
|
||||
"Unable to access specified class " + managerClassName, e);
|
||||
} catch (InstantiationException e) {
|
||||
throw new UnsupportedOperationException(
|
||||
"Unable to instantiate specified class " + managerClassName, e);
|
||||
} catch (InvocationTargetException e) {
|
||||
throw new UnsupportedOperationException(
|
||||
"Unable to invoke specified target class constructor " + managerClassName, e);
|
||||
} catch (NoSuchMethodException e) {
|
||||
throw new UnsupportedOperationException(
|
||||
"Unable to find suitable constructor for class " + managerClassName, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer getDeterministicRandomSeed() {
|
||||
ImmutableList<StoreFile> snapshot = storefiles;
|
||||
if (snapshot != null && !snapshot.isEmpty()) {
|
||||
return snapshot.get(0).getPath().getName().hashCode();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param family
|
||||
* @return
|
||||
*/
|
||||
long getTTL(final HColumnDescriptor family) {
|
||||
private static long determineTTLFromFamily(final HColumnDescriptor family) {
|
||||
// HCD.getTimeToLive returns ttl in seconds. Convert to milliseconds.
|
||||
long ttl = family.getTimeToLive();
|
||||
if (ttl == HConstants.FOREVER) {
|
||||
|
@ -280,6 +311,11 @@ public class HStore extends SchemaConfigured implements Store {
|
|||
return this.fs;
|
||||
}
|
||||
|
||||
public long getTtl() {
|
||||
// TTL only applies if there's no MIN_VERSIONs setting on the column.
|
||||
return (this.scanInfo.getMinVersions() == 0) ? this.ttl : Long.MAX_VALUE;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the configured bytesPerChecksum value.
|
||||
* @param conf The configuration
|
||||
|
@ -771,8 +807,11 @@ public class HStore extends SchemaConfigured implements Store {
|
|||
} while (hasMore);
|
||||
} finally {
|
||||
// Write out the log sequence number that corresponds to this output
|
||||
// hfile. The hfile is current up to and including logCacheFlushId.
|
||||
// hfile. Also write current time in metadata as minFlushTime.
|
||||
// The hfile is current up to and including logCacheFlushId.
|
||||
status.setStatus("Flushing " + this + ": appending metadata");
|
||||
writer.appendFileInfo(StoreFile.MIN_FLUSH_TIME,
|
||||
Bytes.toBytes(EnvironmentEdgeManager.currentTimeMillis()));
|
||||
writer.appendMetadata(logCacheFlushId, false);
|
||||
status.setStatus("Flushing " + this + ": closing flushed file");
|
||||
writer.close();
|
||||
|
@ -1014,12 +1053,12 @@ public class HStore extends SchemaConfigured implements Store {
|
|||
|
||||
// Ready to go. Have list of files to compact.
|
||||
LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
|
||||
+ this + " of "
|
||||
+ this.region.getRegionInfo().getRegionNameAsString()
|
||||
+ this + " of " + this.region.getRegionInfo().getRegionNameAsString()
|
||||
+ " into tmpdir=" + region.getTmpDir() + ", seqid=" + maxId + ", totalSize="
|
||||
+ StringUtils.humanReadableInt(cr.getSize()));
|
||||
|
||||
StoreFile sf = null;
|
||||
long compactionStartTime = EnvironmentEdgeManager.currentTimeMillis();
|
||||
try {
|
||||
StoreFile.Writer writer =
|
||||
this.compactor.compact(this, filesToCompact, cr.isMajor(), maxId);
|
||||
|
@ -1048,8 +1087,11 @@ public class HStore extends SchemaConfigured implements Store {
|
|||
(sf == null ? "none" : sf.getPath().getName()) +
|
||||
", size=" + (sf == null ? "none" :
|
||||
StringUtils.humanReadableInt(sf.getReader().length()))
|
||||
+ "; total size for store is "
|
||||
+ StringUtils.humanReadableInt(storeSize));
|
||||
+ "; total size for store is " + StringUtils.humanReadableInt(storeSize)
|
||||
+ ". This selection was in queue for "
|
||||
+ StringUtils.formatTimeDiff(compactionStartTime, cr.getSelectionTime()) + ", and took "
|
||||
+ StringUtils.formatTimeDiff(EnvironmentEdgeManager.currentTimeMillis(), compactionStartTime)
|
||||
+ " to execute.");
|
||||
return sf;
|
||||
}
|
||||
|
||||
|
@ -1107,11 +1149,8 @@ public class HStore extends SchemaConfigured implements Store {
|
|||
return hasReferences(this.storefiles);
|
||||
}
|
||||
|
||||
/*
|
||||
* @param files
|
||||
* @return True if any of the files in <code>files</code> are References.
|
||||
*/
|
||||
private boolean hasReferences(Collection<StoreFile> files) {
|
||||
@Override
|
||||
public boolean hasReferences(Collection<StoreFile> files) {
|
||||
if (files != null && files.size() > 0) {
|
||||
for (StoreFile hsf: files) {
|
||||
if (hsf.isReference()) {
|
||||
|
@ -1122,22 +1161,6 @@ public class HStore extends SchemaConfigured implements Store {
|
|||
return false;
|
||||
}
|
||||
|
||||
/*
|
||||
* Gets lowest timestamp from candidate StoreFiles
|
||||
*
|
||||
* @param fs
|
||||
* @param dir
|
||||
* @throws IOException
|
||||
*/
|
||||
public static long getLowestTimestamp(final List<StoreFile> candidates)
|
||||
throws IOException {
|
||||
long minTs = Long.MAX_VALUE;
|
||||
for (StoreFile storeFile : candidates) {
|
||||
minTs = Math.min(minTs, storeFile.getModificationTimeStamp());
|
||||
}
|
||||
return minTs;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompactionProgress getCompactionProgress() {
|
||||
return this.compactor.getProgress();
|
||||
|
@ -1153,91 +1176,7 @@ public class HStore extends SchemaConfigured implements Store {
|
|||
}
|
||||
|
||||
List<StoreFile> candidates = new ArrayList<StoreFile>(this.storefiles);
|
||||
|
||||
// exclude files above the max compaction threshold
|
||||
// except: save all references. we MUST compact them
|
||||
int pos = 0;
|
||||
while (pos < candidates.size() &&
|
||||
candidates.get(pos).getReader().length() > this.maxCompactSize &&
|
||||
!candidates.get(pos).isReference()) ++pos;
|
||||
candidates.subList(0, pos).clear();
|
||||
|
||||
return isMajorCompaction(candidates);
|
||||
}
|
||||
|
||||
/*
|
||||
* @param filesToCompact Files to compact. Can be null.
|
||||
* @return True if we should run a major compaction.
|
||||
*/
|
||||
private boolean isMajorCompaction(final List<StoreFile> filesToCompact) throws IOException {
|
||||
boolean result = false;
|
||||
long mcTime = getNextMajorCompactTime();
|
||||
if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) {
|
||||
return result;
|
||||
}
|
||||
// TODO: Use better method for determining stamp of last major (HBASE-2990)
|
||||
long lowTimestamp = getLowestTimestamp(filesToCompact);
|
||||
long now = System.currentTimeMillis();
|
||||
if (lowTimestamp > 0l && lowTimestamp < (now - mcTime)) {
|
||||
// Major compaction time has elapsed.
|
||||
if (filesToCompact.size() == 1) {
|
||||
// Single file
|
||||
StoreFile sf = filesToCompact.get(0);
|
||||
long oldest =
|
||||
(sf.getReader().timeRangeTracker == null) ?
|
||||
Long.MIN_VALUE :
|
||||
now - sf.getReader().timeRangeTracker.minimumTimestamp;
|
||||
if (sf.isMajorCompaction() &&
|
||||
(this.ttl == HConstants.FOREVER || oldest < this.ttl)) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Skipping major compaction of " + this +
|
||||
" because one (major) compacted file only and oldestTime " +
|
||||
oldest + "ms is < ttl=" + this.ttl);
|
||||
}
|
||||
} else if (this.ttl != HConstants.FOREVER && oldest > this.ttl) {
|
||||
LOG.debug("Major compaction triggered on store " + this +
|
||||
", because keyvalues outdated; time since last major compaction " +
|
||||
(now - lowTimestamp) + "ms");
|
||||
result = true;
|
||||
}
|
||||
} else {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Major compaction triggered on store " + this +
|
||||
"; time since last major compaction " + (now - lowTimestamp) + "ms");
|
||||
}
|
||||
result = true;
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
long getNextMajorCompactTime() {
|
||||
// default = 24hrs
|
||||
long ret = conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24);
|
||||
if (family.getValue(HConstants.MAJOR_COMPACTION_PERIOD) != null) {
|
||||
String strCompactionTime =
|
||||
family.getValue(HConstants.MAJOR_COMPACTION_PERIOD);
|
||||
ret = (new Long(strCompactionTime)).longValue();
|
||||
}
|
||||
|
||||
if (ret > 0) {
|
||||
// default = 20% = +/- 4.8 hrs
|
||||
double jitterPct = conf.getFloat("hbase.hregion.majorcompaction.jitter",
|
||||
0.20F);
|
||||
if (jitterPct > 0) {
|
||||
long jitter = Math.round(ret * jitterPct);
|
||||
// deterministic jitter avoids a major compaction storm on restart
|
||||
ImmutableList<StoreFile> snapshot = storefiles;
|
||||
if (snapshot != null && !snapshot.isEmpty()) {
|
||||
String seed = snapshot.get(0).getPath().getName();
|
||||
double curRand = new Random(seed.hashCode()).nextDouble();
|
||||
ret += jitter - Math.round(2L * jitter * curRand);
|
||||
} else {
|
||||
ret = 0; // no storefiles == no major compaction
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
return compactionManager.isMajorCompaction(candidates);
|
||||
}
|
||||
|
||||
public CompactionRequest requestCompaction() throws IOException {
|
||||
|
@ -1273,9 +1212,10 @@ public class HStore extends SchemaConfigured implements Store {
|
|||
CompactSelection filesToCompact;
|
||||
if (override) {
|
||||
// coprocessor is overriding normal file selection
|
||||
filesToCompact = new CompactSelection(conf, candidates);
|
||||
filesToCompact = new CompactSelection(candidates);
|
||||
} else {
|
||||
filesToCompact = compactSelection(candidates, priority);
|
||||
filesToCompact = compactionManager.selectCompaction(candidates, priority,
|
||||
forceMajor && filesCompacting.isEmpty());
|
||||
}
|
||||
|
||||
if (region.getCoprocessorHost() != null) {
|
||||
|
@ -1325,191 +1265,6 @@ public class HStore extends SchemaConfigured implements Store {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Algorithm to choose which files to compact, see {@link #compactSelection(java.util.List, int)}
|
||||
* @param candidates
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
CompactSelection compactSelection(List<StoreFile> candidates) throws IOException {
|
||||
return compactSelection(candidates,Store.NO_PRIORITY);
|
||||
}
|
||||
|
||||
/**
|
||||
* Algorithm to choose which files to compact
|
||||
*
|
||||
* Configuration knobs:
|
||||
* "hbase.hstore.compaction.ratio"
|
||||
* normal case: minor compact when file <= sum(smaller_files) * ratio
|
||||
* "hbase.hstore.compaction.min.size"
|
||||
* unconditionally compact individual files below this size
|
||||
* "hbase.hstore.compaction.max.size"
|
||||
* never compact individual files above this size (unless splitting)
|
||||
* "hbase.hstore.compaction.min"
|
||||
* min files needed to minor compact
|
||||
* "hbase.hstore.compaction.max"
|
||||
* max files to compact at once (avoids OOM)
|
||||
*
|
||||
* @param candidates candidate files, ordered from oldest to newest
|
||||
* @return subset copy of candidate list that meets compaction criteria
|
||||
* @throws IOException
|
||||
*/
|
||||
CompactSelection compactSelection(List<StoreFile> candidates, int priority)
|
||||
throws IOException {
|
||||
// ASSUMPTION!!! filesCompacting is locked when calling this function
|
||||
|
||||
/* normal skew:
|
||||
*
|
||||
* older ----> newer
|
||||
* _
|
||||
* | | _
|
||||
* | | | | _
|
||||
* --|-|- |-|- |-|---_-------_------- minCompactSize
|
||||
* | | | | | | | | _ | |
|
||||
* | | | | | | | | | | | |
|
||||
* | | | | | | | | | | | |
|
||||
*/
|
||||
CompactSelection compactSelection = new CompactSelection(conf, candidates);
|
||||
|
||||
boolean forcemajor = this.forceMajor && filesCompacting.isEmpty();
|
||||
if (!forcemajor) {
|
||||
// Delete the expired store files before the compaction selection.
|
||||
if (conf.getBoolean("hbase.store.delete.expired.storefile", true)
|
||||
&& (ttl != Long.MAX_VALUE) && (this.scanInfo.minVersions == 0)) {
|
||||
CompactSelection expiredSelection = compactSelection
|
||||
.selectExpiredStoreFilesToCompact(
|
||||
EnvironmentEdgeManager.currentTimeMillis() - this.ttl);
|
||||
|
||||
// If there is any expired store files, delete them by compaction.
|
||||
if (expiredSelection != null) {
|
||||
return expiredSelection;
|
||||
}
|
||||
}
|
||||
// do not compact old files above a configurable threshold
|
||||
// save all references. we MUST compact them
|
||||
int pos = 0;
|
||||
while (pos < compactSelection.getFilesToCompact().size() &&
|
||||
compactSelection.getFilesToCompact().get(pos).getReader().length()
|
||||
> maxCompactSize &&
|
||||
!compactSelection.getFilesToCompact().get(pos).isReference()) ++pos;
|
||||
if (pos != 0) compactSelection.clearSubList(0, pos);
|
||||
}
|
||||
|
||||
if (compactSelection.getFilesToCompact().isEmpty()) {
|
||||
LOG.debug(this.getHRegionInfo().getEncodedName() + " - " +
|
||||
this + ": no store files to compact");
|
||||
compactSelection.emptyFileList();
|
||||
return compactSelection;
|
||||
}
|
||||
|
||||
// Force a major compaction if this is a user-requested major compaction,
|
||||
// or if we do not have too many files to compact and this was requested
|
||||
// as a major compaction
|
||||
boolean majorcompaction = (forcemajor && priority == Store.PRIORITY_USER) ||
|
||||
(forcemajor || isMajorCompaction(compactSelection.getFilesToCompact())) &&
|
||||
(compactSelection.getFilesToCompact().size() < this.maxFilesToCompact
|
||||
);
|
||||
LOG.debug(this.getHRegionInfo().getEncodedName() + " - " +
|
||||
this.getColumnFamilyName() + ": Initiating " +
|
||||
(majorcompaction ? "major" : "minor") + "compaction");
|
||||
|
||||
if (!majorcompaction &&
|
||||
!hasReferences(compactSelection.getFilesToCompact())) {
|
||||
// we're doing a minor compaction, let's see what files are applicable
|
||||
int start = 0;
|
||||
double r = compactSelection.getCompactSelectionRatio();
|
||||
|
||||
// remove bulk import files that request to be excluded from minors
|
||||
compactSelection.getFilesToCompact().removeAll(Collections2.filter(
|
||||
compactSelection.getFilesToCompact(),
|
||||
new Predicate<StoreFile>() {
|
||||
public boolean apply(StoreFile input) {
|
||||
return input.excludeFromMinorCompaction();
|
||||
}
|
||||
}));
|
||||
|
||||
// skip selection algorithm if we don't have enough files
|
||||
if (compactSelection.getFilesToCompact().size() < this.minFilesToCompact) {
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("Not compacting files because we only have " +
|
||||
compactSelection.getFilesToCompact().size() +
|
||||
" files ready for compaction. Need " + this.minFilesToCompact + " to initiate.");
|
||||
}
|
||||
compactSelection.emptyFileList();
|
||||
return compactSelection;
|
||||
}
|
||||
|
||||
/* TODO: add sorting + unit test back in when HBASE-2856 is fixed
|
||||
// Sort files by size to correct when normal skew is altered by bulk load.
|
||||
Collections.sort(filesToCompact, StoreFile.Comparators.FILE_SIZE);
|
||||
*/
|
||||
|
||||
// get store file sizes for incremental compacting selection.
|
||||
int countOfFiles = compactSelection.getFilesToCompact().size();
|
||||
long [] fileSizes = new long[countOfFiles];
|
||||
long [] sumSize = new long[countOfFiles];
|
||||
for (int i = countOfFiles-1; i >= 0; --i) {
|
||||
StoreFile file = compactSelection.getFilesToCompact().get(i);
|
||||
fileSizes[i] = file.getReader().length();
|
||||
// calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
|
||||
int tooFar = i + this.maxFilesToCompact - 1;
|
||||
sumSize[i] = fileSizes[i]
|
||||
+ ((i+1 < countOfFiles) ? sumSize[i+1] : 0)
|
||||
- ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
|
||||
}
|
||||
|
||||
/* Start at the oldest file and stop when you find the first file that
|
||||
* meets compaction criteria:
|
||||
* (1) a recently-flushed, small file (i.e. <= minCompactSize)
|
||||
* OR
|
||||
* (2) within the compactRatio of sum(newer_files)
|
||||
* Given normal skew, any newer files will also meet this criteria
|
||||
*
|
||||
* Additional Note:
|
||||
* If fileSizes.size() >> maxFilesToCompact, we will recurse on
|
||||
* compact(). Consider the oldest files first to avoid a
|
||||
* situation where we always compact [end-threshold,end). Then, the
|
||||
* last file becomes an aggregate of the previous compactions.
|
||||
*/
|
||||
while(countOfFiles - start >= this.minFilesToCompact &&
|
||||
fileSizes[start] >
|
||||
Math.max(minCompactSize, (long)(sumSize[start+1] * r))) {
|
||||
++start;
|
||||
}
|
||||
int end = Math.min(countOfFiles, start + this.maxFilesToCompact);
|
||||
long totalSize = fileSizes[start]
|
||||
+ ((start+1 < countOfFiles) ? sumSize[start+1] : 0);
|
||||
compactSelection = compactSelection.getSubList(start, end);
|
||||
|
||||
// if we don't have enough files to compact, just wait
|
||||
if (compactSelection.getFilesToCompact().size() < this.minFilesToCompact) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Skipped compaction of " + this
|
||||
+ ". Only " + (end - start) + " file(s) of size "
|
||||
+ StringUtils.humanReadableInt(totalSize)
|
||||
+ " have met compaction criteria.");
|
||||
}
|
||||
compactSelection.emptyFileList();
|
||||
return compactSelection;
|
||||
}
|
||||
} else {
|
||||
if(majorcompaction) {
|
||||
if (compactSelection.getFilesToCompact().size() > this.maxFilesToCompact) {
|
||||
LOG.debug("Warning, compacting more than " + this.maxFilesToCompact +
|
||||
" files, probably because of a user-requested major compaction");
|
||||
if(priority != Store.PRIORITY_USER) {
|
||||
LOG.error("Compacting more than max files on a non user-requested compaction");
|
||||
}
|
||||
}
|
||||
} else if (compactSelection.getFilesToCompact().size() > this.maxFilesToCompact) {
|
||||
// all files included in this compaction, up to max
|
||||
int pastMax = compactSelection.getFilesToCompact().size() - this.maxFilesToCompact;
|
||||
compactSelection.getFilesToCompact().subList(0, pastMax).clear();
|
||||
}
|
||||
}
|
||||
return compactSelection;
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates a store file by opening and closing it. In HFileV2 this should
|
||||
* not be an expensive operation.
|
||||
|
@ -2017,11 +1772,7 @@ public class HStore extends SchemaConfigured implements Store {
|
|||
|
||||
@Override
|
||||
public boolean throttleCompaction(long compactionSize) {
|
||||
// see HBASE-5867 for discussion on the default
|
||||
long throttlePoint = conf.getLong(
|
||||
"hbase.regionserver.thread.compaction.throttle",
|
||||
2 * this.minFilesToCompact * this.region.memstoreFlushSize);
|
||||
return compactionSize > throttlePoint;
|
||||
return compactionManager.throttleCompaction(compactionSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -2116,7 +1867,7 @@ public class HStore extends SchemaConfigured implements Store {
|
|||
|
||||
@Override
|
||||
public boolean needsCompaction() {
|
||||
return (storefiles.size() - filesCompacting.size()) > minFilesToCompact;
|
||||
return compactionManager.needsCompaction(storefiles.size() - filesCompacting.size());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -2126,8 +1877,8 @@ public class HStore extends SchemaConfigured implements Store {
|
|||
|
||||
public static final long FIXED_OVERHEAD =
|
||||
ClassSize.align(SchemaConfigured.SCHEMA_CONFIGURED_UNALIGNED_HEAP_SIZE +
|
||||
+ (17 * ClassSize.REFERENCE) + (6 * Bytes.SIZEOF_LONG)
|
||||
+ (5 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
|
||||
+ (18 * ClassSize.REFERENCE) + (4 * Bytes.SIZEOF_LONG)
|
||||
+ (3 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
|
||||
|
||||
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
|
||||
+ ClassSize.OBJECT + ClassSize.REENTRANT_LOCK
|
||||
|
@ -2148,6 +1899,15 @@ public class HStore extends SchemaConfigured implements Store {
|
|||
return scanInfo;
|
||||
}
|
||||
|
||||
/**
|
||||
* Refreshes compaction manager class configuration.
|
||||
* Used for tests only - not plumbed thru any layers.
|
||||
* TODO: replace when HBASE-3909 is in.
|
||||
*/
|
||||
void updateConfiguration() {
|
||||
setCompactionPolicy(conf.get(COMPACTION_MANAGER_CLASS, DEFAULT_COMPACTION_MANAGER_CLASS));
|
||||
}
|
||||
|
||||
/**
|
||||
* Immutable information for scans over a store.
|
||||
*/
|
||||
|
|
|
@ -18,6 +18,8 @@
|
|||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.NavigableSet;
|
||||
|
||||
|
@ -204,6 +206,12 @@ public interface Store extends SchemaAware, HeapSize {
|
|||
* @return <tt>true</tt> if the store has any underlying reference files to older HFiles
|
||||
*/
|
||||
public boolean hasReferences();
|
||||
|
||||
/*
|
||||
* @param files
|
||||
* @return True if any of the files in <code>files</code> are References.
|
||||
*/
|
||||
public boolean hasReferences(Collection<StoreFile> files);
|
||||
|
||||
/**
|
||||
* @return The size of this store's memstore, in bytes
|
||||
|
@ -267,6 +275,11 @@ public interface Store extends SchemaAware, HeapSize {
|
|||
* @return the total size of all Bloom filters in the store
|
||||
*/
|
||||
public long getTotalStaticBloomSize();
|
||||
|
||||
/**
|
||||
* Returns the TTL for this store's column family.
|
||||
*/
|
||||
public long getTtl();
|
||||
|
||||
// Test-helper methods
|
||||
|
||||
|
@ -287,4 +300,10 @@ public interface Store extends SchemaAware, HeapSize {
|
|||
* @return the parent region hosting this store
|
||||
*/
|
||||
public HRegion getHRegion();
|
||||
|
||||
/**
|
||||
* @return A hash code depending on the state of the current store files.
|
||||
* This is used as seed for deterministic random generator for selecting major compaction time
|
||||
*/
|
||||
public Integer getDeterministicRandomSeed();
|
||||
}
|
||||
|
|
|
@ -67,6 +67,7 @@ import org.apache.hadoop.hbase.util.BloomFilter;
|
|||
import org.apache.hadoop.hbase.util.BloomFilterFactory;
|
||||
import org.apache.hadoop.hbase.util.BloomFilterWriter;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.Writables;
|
||||
import org.apache.hadoop.io.RawComparator;
|
||||
|
@ -114,6 +115,9 @@ public class StoreFile extends SchemaConfigured {
|
|||
/** Max Sequence ID in FileInfo */
|
||||
public static final byte [] MAX_SEQ_ID_KEY = Bytes.toBytes("MAX_SEQ_ID_KEY");
|
||||
|
||||
/** Min Flush time in FileInfo */
|
||||
public static final byte [] MIN_FLUSH_TIME = Bytes.toBytes("MIN_FLUSH_TIME");
|
||||
|
||||
/** Major compaction flag in FileInfo */
|
||||
public static final byte[] MAJOR_COMPACTION_KEY =
|
||||
Bytes.toBytes("MAJOR_COMPACTION_KEY");
|
||||
|
@ -143,6 +147,9 @@ public class StoreFile extends SchemaConfigured {
|
|||
// Need to make it 8k for testing.
|
||||
public static final int DEFAULT_BLOCKSIZE_SMALL = 8 * 1024;
|
||||
|
||||
/** Default value for files without minFlushTime in metadata */
|
||||
public static final long NO_MIN_FLUSH_TIME = -1;
|
||||
|
||||
private final FileSystem fs;
|
||||
|
||||
// This file's path.
|
||||
|
@ -169,6 +176,8 @@ public class StoreFile extends SchemaConfigured {
|
|||
// Keys for metadata stored in backing HFile.
|
||||
// Set when we obtain a Reader.
|
||||
private long sequenceid = -1;
|
||||
// default value is -1, remains -1 if file written without minFlushTime
|
||||
private long minFlushTime = NO_MIN_FLUSH_TIME;
|
||||
|
||||
// max of the MemstoreTS in the KV's in this store
|
||||
// Set when we obtain a Reader.
|
||||
|
@ -381,6 +390,22 @@ public class StoreFile extends SchemaConfigured {
|
|||
return this.sequenceid;
|
||||
}
|
||||
|
||||
public boolean hasMinFlushTime() {
|
||||
return this.minFlushTime != NO_MIN_FLUSH_TIME;
|
||||
}
|
||||
|
||||
public long getMinFlushTime() {
|
||||
// BulkLoad files are assumed to contain very old data, return 0
|
||||
if (isBulkLoadResult() && getMaxSequenceId() <= 0) {
|
||||
return 0;
|
||||
} else if (this.minFlushTime == NO_MIN_FLUSH_TIME) {
|
||||
// File written without minFlushTime field assume recent data
|
||||
return EnvironmentEdgeManager.currentTimeMillis();
|
||||
} else {
|
||||
return this.minFlushTime;
|
||||
}
|
||||
}
|
||||
|
||||
public long getModificationTimeStamp() {
|
||||
return modificationTimeStamp;
|
||||
}
|
||||
|
@ -587,7 +612,10 @@ public class StoreFile extends SchemaConfigured {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
b = metadataMap.get(MIN_FLUSH_TIME);
|
||||
if (b != null) {
|
||||
this.minFlushTime = Bytes.toLong(b);
|
||||
}
|
||||
this.reader.setSequenceID(this.sequenceid);
|
||||
|
||||
b = metadataMap.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY);
|
||||
|
|
|
@ -0,0 +1,267 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
import java.text.DecimalFormat;
|
||||
|
||||
/**
|
||||
* Control knobs for default compaction algorithm
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class TierCompactionConfiguration extends CompactionConfiguration {
|
||||
|
||||
private CompactionTier[] compactionTier;
|
||||
private boolean recentFirstOrder;
|
||||
|
||||
TierCompactionConfiguration(Configuration conf, Store store) {
|
||||
super(conf, store);
|
||||
|
||||
String strPrefix = "hbase.hstore.compaction.";
|
||||
String strSchema = "tbl." + store.getHRegion().getTableDesc().getNameAsString()
|
||||
+ "cf." + store.getFamily().getNameAsString() + ".";
|
||||
String strDefault = "Default.";
|
||||
String strAttribute;
|
||||
// If value not set for family, use default family (by passing null).
|
||||
// If default value not set, use 1 tier.
|
||||
|
||||
strAttribute = "NumCompactionTiers";
|
||||
compactionTier = new CompactionTier[
|
||||
conf.getInt(strPrefix + strSchema + strAttribute,
|
||||
conf.getInt(strPrefix + strDefault + strAttribute,
|
||||
1))];
|
||||
|
||||
strAttribute = "IsRecentFirstOrder";
|
||||
recentFirstOrder =
|
||||
conf.getBoolean(strPrefix + strSchema + strAttribute,
|
||||
conf.getBoolean(strPrefix + strDefault + strAttribute,
|
||||
true));
|
||||
|
||||
strAttribute = "MinCompactSize";
|
||||
minCompactSize =
|
||||
conf.getLong(strPrefix + strSchema + strAttribute,
|
||||
conf.getLong(strPrefix + strDefault + strAttribute,
|
||||
0));
|
||||
|
||||
strAttribute = "MaxCompactSize";
|
||||
maxCompactSize =
|
||||
conf.getLong(strPrefix + strSchema + strAttribute,
|
||||
conf.getLong(strPrefix + strDefault + strAttribute,
|
||||
Long.MAX_VALUE));
|
||||
|
||||
strAttribute = "ShouldDeleteExpired";
|
||||
shouldDeleteExpired =
|
||||
conf.getBoolean(strPrefix + strSchema + strAttribute,
|
||||
conf.getBoolean(strPrefix + strDefault + strAttribute,
|
||||
shouldDeleteExpired));
|
||||
|
||||
strAttribute = "ThrottlePoint";
|
||||
throttlePoint =
|
||||
conf.getLong(strPrefix + strSchema + strAttribute,
|
||||
conf.getLong(strPrefix + strDefault + strAttribute,
|
||||
throttlePoint));
|
||||
|
||||
strAttribute = "MajorCompactionPeriod";
|
||||
majorCompactionPeriod =
|
||||
conf.getLong(strPrefix + strSchema + strAttribute,
|
||||
conf.getLong(strPrefix + strDefault + strAttribute,
|
||||
majorCompactionPeriod));
|
||||
|
||||
strAttribute = "MajorCompactionJitter";
|
||||
majorCompactionJitter =
|
||||
conf.getFloat(
|
||||
strPrefix + strSchema + strAttribute,
|
||||
conf.getFloat(
|
||||
strPrefix + strDefault + strAttribute,
|
||||
majorCompactionJitter
|
||||
)
|
||||
);
|
||||
|
||||
for (int i = 0; i < compactionTier.length; i++) {
|
||||
compactionTier[i] = new CompactionTier(i);
|
||||
}
|
||||
}
|
||||
/**
|
||||
* @return Number of compaction Tiers
|
||||
*/
|
||||
int getNumCompactionTiers() {
|
||||
return compactionTier.length;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return The i-th tier from most recent
|
||||
*/
|
||||
CompactionTier getCompactionTier(int i) {
|
||||
return compactionTier[i];
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Whether the tiers will be checked for compaction from newest to oldest
|
||||
*/
|
||||
boolean isRecentFirstOrder() {
|
||||
return recentFirstOrder;
|
||||
}
|
||||
|
||||
/**
|
||||
* Parameters for each tier
|
||||
*/
|
||||
class CompactionTier {
|
||||
|
||||
private long maxAgeInDisk;
|
||||
private long maxSize;
|
||||
private double tierCompactionRatio;
|
||||
private int tierMinFilesToCompact;
|
||||
private int tierMaxFilesToCompact;
|
||||
private int endingIndexForTier;
|
||||
|
||||
CompactionTier(int tier) {
|
||||
String strPrefix = "hbase.hstore.compaction.";
|
||||
String strSchema = "tbl." + store.getHRegion().getTableDesc().getNameAsString()
|
||||
+ "cf." + store.getFamily().getNameAsString() + ".";
|
||||
String strDefault = "Default.";
|
||||
String strDefTier = "";
|
||||
String strTier = "Tier." + String.valueOf(tier) + ".";
|
||||
String strAttribute;
|
||||
|
||||
/**
|
||||
* Use value set for current family, current tier
|
||||
* If not set, use value set for current family, default tier
|
||||
* if not set, use value set for Default family, current tier
|
||||
* If not set, use value set for Default family, default tier
|
||||
* Else just use a default value
|
||||
*/
|
||||
|
||||
strAttribute = "MaxAgeInDisk";
|
||||
maxAgeInDisk =
|
||||
conf.getLong(strPrefix + strSchema + strTier + strAttribute,
|
||||
conf.getLong(strPrefix + strDefault + strTier + strAttribute,
|
||||
Long.MAX_VALUE));
|
||||
|
||||
strAttribute = "MaxSize";
|
||||
maxSize =
|
||||
conf.getLong(strPrefix + strSchema + strTier + strAttribute,
|
||||
conf.getLong(strPrefix + strDefault + strTier + strAttribute,
|
||||
Long.MAX_VALUE));
|
||||
|
||||
strAttribute = "CompactionRatio";
|
||||
tierCompactionRatio = (double)
|
||||
conf.getFloat(strPrefix + strSchema + strTier + strAttribute,
|
||||
conf.getFloat(strPrefix + strSchema + strDefTier + strAttribute,
|
||||
conf.getFloat(strPrefix + strDefault + strTier + strAttribute,
|
||||
conf.getFloat(strPrefix + strDefault + strDefTier + strAttribute,
|
||||
(float) compactionRatio))));
|
||||
|
||||
strAttribute = "MinFilesToCompact";
|
||||
tierMinFilesToCompact =
|
||||
conf.getInt(strPrefix + strSchema + strTier + strAttribute,
|
||||
conf.getInt(strPrefix + strSchema + strDefTier + strAttribute,
|
||||
conf.getInt(strPrefix + strDefault + strTier + strAttribute,
|
||||
conf.getInt(strPrefix + strDefault + strDefTier + strAttribute,
|
||||
minFilesToCompact))));
|
||||
|
||||
strAttribute = "MaxFilesToCompact";
|
||||
tierMaxFilesToCompact =
|
||||
conf.getInt(strPrefix + strSchema + strTier + strAttribute,
|
||||
conf.getInt(strPrefix + strSchema + strDefTier + strAttribute,
|
||||
conf.getInt(strPrefix + strDefault + strTier + strAttribute,
|
||||
conf.getInt(strPrefix + strDefault + strDefTier + strAttribute,
|
||||
maxFilesToCompact))));
|
||||
|
||||
strAttribute = "EndingIndexForTier";
|
||||
endingIndexForTier =
|
||||
conf.getInt(strPrefix + strSchema + strTier + strAttribute,
|
||||
conf.getInt(strPrefix + strDefault + strTier + strAttribute,
|
||||
tier));
|
||||
|
||||
//make sure this value is not incorrectly set
|
||||
if (endingIndexForTier < 0 || endingIndexForTier > tier) {
|
||||
LOG.error("EndingIndexForTier improperly set. Using default value.");
|
||||
endingIndexForTier = tier;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Upper bound on storeFile's minFlushTime to be included in this tier
|
||||
*/
|
||||
long getMaxAgeInDisk() {
|
||||
return maxAgeInDisk;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Upper bound on storeFile's size to be included in this tier
|
||||
*/
|
||||
long getMaxSize() {
|
||||
return maxSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Compaction ratio for selections of this tier
|
||||
*/
|
||||
double getCompactionRatio() {
|
||||
return tierCompactionRatio;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return lower bound on number of files in selections of this tier
|
||||
*/
|
||||
int getMinFilesToCompact() {
|
||||
return tierMinFilesToCompact;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return upper bound on number of files in selections of this tier
|
||||
*/
|
||||
int getMaxFilesToCompact() {
|
||||
return tierMaxFilesToCompact;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the newest tier which will also be included in selections of this tier
|
||||
* by default it is the index of this tier, must be between 0 and this tier
|
||||
*/
|
||||
int getEndingIndexForTier() {
|
||||
return endingIndexForTier;
|
||||
}
|
||||
|
||||
String getDescription() {
|
||||
String ageString = "INF";
|
||||
String sizeString = "INF";
|
||||
if (getMaxAgeInDisk() < Long.MAX_VALUE) {
|
||||
ageString = StringUtils.formatTime(getMaxAgeInDisk());
|
||||
}
|
||||
if (getMaxSize() < Long.MAX_VALUE) {
|
||||
ageString = StringUtils.humanReadableInt(getMaxSize());
|
||||
}
|
||||
String ret = "Has files upto age " + ageString
|
||||
+ " and upto size " + sizeString + ". "
|
||||
+ "Compaction ratio: " + (new DecimalFormat("#.##")).format(getCompactionRatio()) + ", "
|
||||
+ "Compaction Selection with at least " + getMinFilesToCompact() + " and "
|
||||
+ "at most " + getMaxFilesToCompact() + " files possible, "
|
||||
+ "Selections in this tier includes files up to tier " + getEndingIndexForTier();
|
||||
return ret;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,256 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public class TierCompactionManager extends CompactionManager {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(TierCompactionManager.class);
|
||||
|
||||
private int[] endInTier;
|
||||
private int[] tierOf;
|
||||
|
||||
private TierCompactionConfiguration tierConf;
|
||||
|
||||
TierCompactionManager(Configuration configuration, Store store) {
|
||||
super(configuration, store);
|
||||
comConf = new TierCompactionConfiguration(configuration, store);
|
||||
tierConf = (TierCompactionConfiguration) comConf;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param candidates pre-filtrate
|
||||
* @return filtered subset
|
||||
* -- Tier Based minor compaction selection algorithm: Choose CompactSelection from candidates --
|
||||
* <p/>
|
||||
* First exclude bulk-load files if indicated in configuration.
|
||||
* Arrange files from oldest to newest then select an appropriate ['start','end') pair
|
||||
* try 'start' from oldest to newest (smallest to largest fileIndex)
|
||||
* for each value, identify the 'end' fileIndex
|
||||
* stop when the range ['start','end') is an admissible compaction
|
||||
* <p/>
|
||||
* Notes:
|
||||
* <p/>
|
||||
* a compaction is admissible if
|
||||
* - file fileSize[start] is at most maxCompactSize AND
|
||||
* - number of files is at least currentTier.minFilesToCompact AND
|
||||
* - (fileSize[start] is at most ratio times the rest of the files in the compaction OR
|
||||
* - fileSize[start] is at most minCompactSize)
|
||||
* <p/>
|
||||
* end is endInTier[tierOf[start].endingInclusionTier]
|
||||
* By default currentTier.endingIndexForTier = currentTier, so in the default
|
||||
* case 'end' is always 1 + the last fileIndex in currentTier, making sure
|
||||
* files from different tiers are never selected together in the default case
|
||||
* normal skew:
|
||||
*
|
||||
* older ----> newer (increasing seqID, increasing minFlushTime)
|
||||
*
|
||||
* Tier 2 | Tier 1 | Tier 0
|
||||
* | |
|
||||
* _ | |
|
||||
* | | | _ |
|
||||
* | | | | | _ |
|
||||
* --|-|-|-|-|- |-|-|--_-------_------- minCompactSize
|
||||
* | | | | | | | | | | _ | |
|
||||
* | | | | | | | | | | | | | |
|
||||
* | | | | | | | | | | | | | |
|
||||
*/
|
||||
@Override
|
||||
CompactSelection applyCompactionPolicy(CompactSelection candidates) throws IOException {
|
||||
// we're doing a minor compaction, let's see what files are applicable
|
||||
int start = -1;
|
||||
int end = -1;
|
||||
|
||||
// skip selection algorithm if we don't have enough files
|
||||
if (candidates.getFilesToCompact().isEmpty()) {
|
||||
candidates.emptyFileList();
|
||||
return candidates;
|
||||
}
|
||||
|
||||
// get store file sizes for incremental compacting selection.
|
||||
int countOfFiles = candidates.getFilesToCompact().size();
|
||||
long[] fileSizes = new long[countOfFiles];
|
||||
StoreFile file;
|
||||
long[] sumSize = new long[countOfFiles + 1];
|
||||
sumSize[countOfFiles] = 0;
|
||||
for (int i = countOfFiles - 1; i >= 0; --i) {
|
||||
file = candidates.getFilesToCompact().get(i);
|
||||
fileSizes[i] = file.getReader().length();
|
||||
// calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
|
||||
sumSize[i] = fileSizes[i] + sumSize[i + 1];
|
||||
}
|
||||
|
||||
/**
|
||||
* divide into tiers:
|
||||
* assign tierOf[fileIndex] = tierIndex
|
||||
* assign endInTier[tierIndex] = 1 + index of the last file in tierIndex
|
||||
*/
|
||||
// Backward compatibility - if files with indices < i don't have minFlushTime field, then
|
||||
// all of them get tierOf[i]. If no file has minFlushTime all gets tier zero.
|
||||
int numTiers = tierConf.getNumCompactionTiers();
|
||||
TierCompactionConfiguration.CompactionTier tier;
|
||||
tierOf = new int[countOfFiles];
|
||||
endInTier = new int[numTiers + 1];
|
||||
endInTier[numTiers] = 0;
|
||||
|
||||
LOG.info("Applying TierCompactionPolicy with " + countOfFiles + " files");
|
||||
|
||||
int i;
|
||||
int j = countOfFiles;
|
||||
|
||||
for (i = 0; i < numTiers; i++) {
|
||||
tier = tierConf.getCompactionTier(i);
|
||||
endInTier[i] = j;
|
||||
while (j > 0) {
|
||||
file = candidates.getFilesToCompact().get(j - 1);
|
||||
if (!isInTier(file, tier)) {
|
||||
break;
|
||||
}
|
||||
j--;
|
||||
tierOf[j] = i;
|
||||
}
|
||||
}
|
||||
|
||||
long restSize;
|
||||
double ratio;
|
||||
|
||||
//Main algorithm
|
||||
for (j = 0; j < countOfFiles; j++) {
|
||||
start = next(start);
|
||||
tier = tierConf.getCompactionTier(tierOf[start]);
|
||||
end = endInTier[tier.getEndingIndexForTier()];
|
||||
restSize = sumSize[start + 1] - sumSize[end];
|
||||
ratio = tier.getCompactionRatio();
|
||||
if (fileSizes[start] <= tierConf.getMaxCompactSize() &&
|
||||
end - start >= tier.getMinFilesToCompact() &&
|
||||
(fileSizes[start] <= tierConf.getMinCompactSize() ||
|
||||
(fileSizes[start] <= restSize * ratio))) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
String tab = " ";
|
||||
for (i = 0; i < numTiers; i++) {
|
||||
LOG.info("Tier " + i + " : " + tierConf.getCompactionTier(i).getDescription());
|
||||
if (endInTier[i] == endInTier[i+1]) {
|
||||
LOG.info(tab + "No file is assigned to this tier.");
|
||||
} else {
|
||||
LOG.info(tab + (endInTier[i] - endInTier[i+1])
|
||||
+ " file(s) are assigned to this tier with serial number(s) "
|
||||
+ endInTier[i + 1] + " to " + (endInTier[i] - 1));
|
||||
}
|
||||
for (j = endInTier[i + 1]; j < endInTier[i]; j++) {
|
||||
file = candidates.getFilesToCompact().get(j);
|
||||
LOG.info(tab + tab + "SeqID = " + file.getMaxSequenceId()
|
||||
+ ", Age = " + StringUtils.formatTimeDiff(
|
||||
EnvironmentEdgeManager.currentTimeMillis(), file.getMinFlushTime())
|
||||
+ ", Size = " + StringUtils.humanReadableInt(fileSizes[j])
|
||||
+ ", Path = " + file.getPath());
|
||||
}
|
||||
}
|
||||
if (start < countOfFiles) {
|
||||
end = Math.min(end, start
|
||||
+ tierConf.getCompactionTier(tierOf[start]).getMaxFilesToCompact());
|
||||
}
|
||||
if (start < end) {
|
||||
String strTier = String.valueOf(tierOf[start]);
|
||||
if (tierOf[end - 1] != tierOf[start]) {
|
||||
strTier += " to " + tierOf[end - 1];
|
||||
}
|
||||
LOG.info("Tier Based compaction algorithm has selected " + (end - start)
|
||||
+ " files from tier " + strTier + " out of " + countOfFiles + " candidates");
|
||||
}
|
||||
|
||||
candidates = candidates.getSubList(start, end);
|
||||
return candidates;
|
||||
}
|
||||
|
||||
private boolean isInTier(StoreFile file, TierCompactionConfiguration.CompactionTier tier) {
|
||||
return file.getReader().length() <= tier.getMaxSize() &&
|
||||
EnvironmentEdgeManager.currentTimeMillis()-file.getMinFlushTime() <= tier.getMaxAgeInDisk();
|
||||
}
|
||||
|
||||
/**
|
||||
* This function iterates over the start values in order.
|
||||
* Whenever an admissible compaction is found, we return the selection.
|
||||
* Hence the order is important if there are more than one admissible compaction.
|
||||
* @param start current Value
|
||||
* @return next Value
|
||||
*/
|
||||
private int next(int start) {
|
||||
if (tierConf.isRecentFirstOrder()) {
|
||||
return backNext(start);
|
||||
}
|
||||
return fwdNext(start);
|
||||
}
|
||||
|
||||
/**
|
||||
* This function iterates over the start values in newer-first order of tiers,
|
||||
* but older-first order of files within a tier.
|
||||
* For example, suppose the tiers are:
|
||||
* Tier 3 - files 0,1,2
|
||||
* Tier 2 - files 3,4
|
||||
* Tier 1 - no files
|
||||
* Tier 0 - files 5,6,7
|
||||
* Then the order of 'start' files will be:
|
||||
* 5,6,7,3,4,0,1,2
|
||||
* @param start current Value
|
||||
* @return next Value
|
||||
*/
|
||||
private int backNext(int start) {
|
||||
int tier = 0;
|
||||
if (start == -1) {
|
||||
while (endInTier[tier] >= endInTier[0]) {
|
||||
tier++;
|
||||
}
|
||||
return endInTier[tier];
|
||||
}
|
||||
tier = tierOf[start];
|
||||
if (endInTier[tier] == start + 1) {
|
||||
tier++;
|
||||
start = endInTier[tier];
|
||||
while (endInTier[tier] >= start) {
|
||||
tier++;
|
||||
}
|
||||
return endInTier[tier];
|
||||
}
|
||||
return start + 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* This function iterates over the start values in older-first order of files.
|
||||
* @param start current Value
|
||||
* @return next Value
|
||||
*/
|
||||
private int fwdNext(int start) {
|
||||
return start + 1;
|
||||
}
|
||||
|
||||
}
|
|
@ -19,15 +19,13 @@
|
|||
package org.apache.hadoop.hbase.regionserver.compactions;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Calendar;
|
||||
import java.util.GregorianCalendar;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public class CompactSelection {
|
||||
|
@ -48,37 +46,15 @@ public class CompactSelection {
|
|||
*/
|
||||
private final static Object compactionCountLock = new Object();
|
||||
|
||||
// HBase conf object
|
||||
Configuration conf;
|
||||
// was this compaction promoted to an off-peak
|
||||
boolean isOffPeakCompaction = false;
|
||||
// compactRatio: double on purpose! Float.MAX < Long.MAX < Double.MAX
|
||||
// With float, java will downcast your long to float for comparisons (bad)
|
||||
private double compactRatio;
|
||||
// compaction ratio off-peak
|
||||
private double compactRatioOffPeak;
|
||||
// offpeak start time
|
||||
private int offPeakStartHour = -1;
|
||||
// off peak end time
|
||||
private int offPeakEndHour = -1;
|
||||
// CompactSelection object creation time.
|
||||
private final long selectionTime;
|
||||
|
||||
public CompactSelection(Configuration conf, List<StoreFile> filesToCompact) {
|
||||
public CompactSelection(List<StoreFile> filesToCompact) {
|
||||
this.selectionTime = EnvironmentEdgeManager.currentTimeMillis();
|
||||
this.filesToCompact = filesToCompact;
|
||||
this.conf = conf;
|
||||
this.compactRatio = conf.getFloat("hbase.hstore.compaction.ratio", 1.2F);
|
||||
this.compactRatioOffPeak = conf.getFloat("hbase.hstore.compaction.ratio.offpeak", 5.0F);
|
||||
|
||||
// Peak time is from [offPeakStartHour, offPeakEndHour). Valid numbers are [0, 23]
|
||||
this.offPeakStartHour = conf.getInt("hbase.offpeak.start.hour", -1);
|
||||
this.offPeakEndHour = conf.getInt("hbase.offpeak.end.hour", -1);
|
||||
if (!isValidHour(this.offPeakStartHour) || !isValidHour(this.offPeakEndHour)) {
|
||||
if (!(this.offPeakStartHour == -1 && this.offPeakEndHour == -1)) {
|
||||
LOG.warn("Invalid start/end hour for peak hour : start = " +
|
||||
this.offPeakStartHour + " end = " + this.offPeakEndHour +
|
||||
". Valid numbers are [0-23]");
|
||||
}
|
||||
this.offPeakStartHour = this.offPeakEndHour = -1;
|
||||
}
|
||||
this.isOffPeakCompaction = false;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -113,49 +89,25 @@ public class CompactSelection {
|
|||
}
|
||||
|
||||
if (hasExpiredStoreFiles) {
|
||||
expiredSFSelection = new CompactSelection(conf, expiredStoreFiles);
|
||||
expiredSFSelection = new CompactSelection(expiredStoreFiles);
|
||||
}
|
||||
return expiredSFSelection;
|
||||
}
|
||||
|
||||
/**
|
||||
* If the current hour falls in the off peak times and there are no
|
||||
* outstanding off peak compactions, the current compaction is
|
||||
* promoted to an off peak compaction. Currently only one off peak
|
||||
* compaction is present in the compaction queue.
|
||||
*
|
||||
* @param currentHour
|
||||
* @return
|
||||
*/
|
||||
public double getCompactSelectionRatio() {
|
||||
double r = this.compactRatio;
|
||||
synchronized(compactionCountLock) {
|
||||
if (isOffPeakHour() && numOutstandingOffPeakCompactions == 0) {
|
||||
r = this.compactRatioOffPeak;
|
||||
numOutstandingOffPeakCompactions++;
|
||||
isOffPeakCompaction = true;
|
||||
}
|
||||
}
|
||||
if(isOffPeakCompaction) {
|
||||
LOG.info("Running an off-peak compaction, selection ratio = " +
|
||||
compactRatioOffPeak + ", numOutstandingOffPeakCompactions is now " +
|
||||
numOutstandingOffPeakCompactions);
|
||||
}
|
||||
return r;
|
||||
}
|
||||
|
||||
/**
|
||||
* The current compaction finished, so reset the off peak compactions count
|
||||
* if this was an off peak compaction.
|
||||
*/
|
||||
public void finishRequest() {
|
||||
if (isOffPeakCompaction) {
|
||||
long newValueToLog = -1;
|
||||
synchronized(compactionCountLock) {
|
||||
numOutstandingOffPeakCompactions--;
|
||||
assert !isOffPeakCompaction : "Double-counting off-peak count for compaction";
|
||||
newValueToLog = --numOutstandingOffPeakCompactions;
|
||||
isOffPeakCompaction = false;
|
||||
}
|
||||
LOG.info("Compaction done, numOutstandingOffPeakCompactions is now " +
|
||||
numOutstandingOffPeakCompactions);
|
||||
newValueToLog);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -170,13 +122,14 @@ public class CompactSelection {
|
|||
public void emptyFileList() {
|
||||
filesToCompact.clear();
|
||||
if (isOffPeakCompaction) {
|
||||
long newValueToLog = -1;
|
||||
synchronized(compactionCountLock) {
|
||||
// reset the off peak count
|
||||
numOutstandingOffPeakCompactions--;
|
||||
newValueToLog = --numOutstandingOffPeakCompactions;
|
||||
isOffPeakCompaction = false;
|
||||
}
|
||||
LOG.info("Nothing to compact, numOutstandingOffPeakCompactions is now " +
|
||||
numOutstandingOffPeakCompactions);
|
||||
newValueToLog);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -184,16 +137,30 @@ public class CompactSelection {
|
|||
return this.isOffPeakCompaction;
|
||||
}
|
||||
|
||||
private boolean isOffPeakHour() {
|
||||
int currentHour = (new GregorianCalendar()).get(Calendar.HOUR_OF_DAY);
|
||||
// If offpeak time checking is disabled just return false.
|
||||
if (this.offPeakStartHour == this.offPeakEndHour) {
|
||||
return false;
|
||||
public static long getNumOutStandingOffPeakCompactions() {
|
||||
synchronized(compactionCountLock) {
|
||||
return numOutstandingOffPeakCompactions;
|
||||
}
|
||||
if (this.offPeakStartHour < this.offPeakEndHour) {
|
||||
return (currentHour >= this.offPeakStartHour && currentHour < this.offPeakEndHour);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tries making the compaction off-peak.
|
||||
* Only checks internal compaction constraints, not timing.
|
||||
* @return Eventual value of isOffPeakCompaction.
|
||||
*/
|
||||
public boolean trySetOffpeak() {
|
||||
assert !isOffPeakCompaction : "Double-setting off-peak for compaction " + this;
|
||||
synchronized(compactionCountLock) {
|
||||
if (numOutstandingOffPeakCompactions == 0) {
|
||||
numOutstandingOffPeakCompactions++;
|
||||
isOffPeakCompaction = true;
|
||||
}
|
||||
}
|
||||
return (currentHour >= this.offPeakStartHour || currentHour < this.offPeakEndHour);
|
||||
return isOffPeakCompaction;
|
||||
}
|
||||
|
||||
public long getSelectionTime() {
|
||||
return selectionTime;
|
||||
}
|
||||
|
||||
public CompactSelection subList(int start, int end) {
|
||||
|
|
|
@ -208,6 +208,10 @@ public class CompactionRequest implements Comparable<CompactionRequest>,
|
|||
return p;
|
||||
}
|
||||
|
||||
public long getSelectionTime() {
|
||||
return compactSelection.getSelectionTime();
|
||||
}
|
||||
|
||||
/** Gets the priority for the request */
|
||||
public void setPriority(int p) {
|
||||
this.p = p;
|
||||
|
@ -272,7 +276,7 @@ public class CompactionRequest implements Comparable<CompactionRequest>,
|
|||
server.checkFileSystem();
|
||||
} finally {
|
||||
s.finishRequest(this);
|
||||
LOG.debug("CompactSplitThread status: " + server.compactSplitThread);
|
||||
LOG.debug("CompactSplitThread Status: " + server.compactSplitThread);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,160 @@
|
|||
<?xml version="1.0"?>
|
||||
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
|
||||
<!--
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
-->
|
||||
<configuration>
|
||||
<property>
|
||||
<name>hbase.hstore.compactionThreshold</name>
|
||||
<value>3</value>
|
||||
<description>
|
||||
If more than this number of HStoreFiles in any one HStore
|
||||
(one HStoreFile is written per flush of memstore) then a compaction
|
||||
is run to rewrite all HStoreFiles files as one. Larger numbers
|
||||
put off compaction but when it runs, it takes longer to complete.
|
||||
During a compaction, updates cannot be flushed to disk. Long
|
||||
compactions require memory sufficient to carry the logging of
|
||||
all updates across the duration of the compaction.
|
||||
|
||||
If too large, clients timeout during compaction.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.hstore.compaction.max</name>
|
||||
<value>10</value>
|
||||
<description>Max number of HStoreFiles to compact per 'minor' compaction.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.hregion.majorcompaction</name>
|
||||
<value>86400000</value>
|
||||
<description>The time (in miliseconds) between 'major' compactions of all
|
||||
HStoreFiles in a region. Default: 1 day.
|
||||
Set to 0 to disable automated major compactions.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>hbase.hstore.compaction.CompactionPolicy</name>
|
||||
<value>TierBasedCompactionPolicy</value>
|
||||
<description>The compaction policy which should be used
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.hstore.compaction.tbl.cluster_test.cf.test_cf.NumCompactionTiers</name>
|
||||
<value>4</value>
|
||||
<description>The number of tiers into which the files are assigned
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>hbase.hstore.compaction.Default.Tier.0.MaxAgeInDisk</name>
|
||||
<value>3600000</value>
|
||||
<description>Length of time for which flush files are in 1st tier
|
||||
value one hour.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.hstore.compaction.tbl.cluster_test.cf.test_cf.Tier.1.MaxAgeInDisk</name>
|
||||
<value>10800000</value>
|
||||
<description>Maximum age of a file to be in second tier
|
||||
value 3 hours.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.hstore.compaction.Default.Tier.2.MaxAgeInDisk</name>
|
||||
<value>36000000</value>
|
||||
<description>Maximum age of a file to be in third tier
|
||||
value 10 hours
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>hbase.hstore.compaction.Default.CompactionRatio</name>
|
||||
<value>0.0</value>
|
||||
<description>The default compaction ratio used if unspecified.
|
||||
value 0.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.hstore.compaction.Default.Tier.1.CompactionRatio</name>
|
||||
<value>1.0</value>
|
||||
<description>The compaction ratio for the second tier.
|
||||
value 1.5.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.hstore.compaction.Default.Tier.2.CompactionRatio</name>
|
||||
<value>0.75</value>
|
||||
<description>The compaction ratio for the third tier.
|
||||
value 0.75.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.hstore.compaction.Default.Tier.3.CompactionRatio</name>
|
||||
<value>0.2</value>
|
||||
<description>The compaction ratio for the fourth tier.
|
||||
value 0.2.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>hbase.hstore.compaction.min</name>
|
||||
<value>2</value>
|
||||
<description>Default minimum number of files to compact
|
||||
value 2.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.hstore.compaction.tbl.cluster_test.cf.MinFilesToCompact</name>
|
||||
<value>3</value>
|
||||
<description>Overridden Default minimum number of files to compact
|
||||
value 3.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.hstore.compaction.max</name>
|
||||
<value>7</value>
|
||||
<description>Default maximum number of files to compact
|
||||
value 7.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.hstore.compaction.Default.Tier.1.MinFilesToCompact</name>
|
||||
<value>2</value>
|
||||
<description>minimum number of files to compact in second tier
|
||||
value 2.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.hstore.compaction.Default.Tier.3.MaxFilesToCompact</name>
|
||||
<value>6</value>
|
||||
<description>maximum number of files to compact in fourth tier
|
||||
value 6.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>hbase.hstore.compaction.Default.Tier.2.EndInclusionTier</name>
|
||||
<value>1</value>
|
||||
<description>The minimum tier whose files go together with this tier
|
||||
value 1.
|
||||
</description>
|
||||
</property>
|
||||
</configuration>
|
|
@ -303,6 +303,8 @@ public class TestCompaction extends HBaseTestCase {
|
|||
conf.setFloat("hbase.hregion.majorcompaction.jitter", jitterPct);
|
||||
|
||||
HStore s = ((HStore) r.getStore(COLUMN_FAMILY));
|
||||
// TODO: temporary call, until HBASE-3909 is committed in some form.
|
||||
s.updateConfiguration();
|
||||
try {
|
||||
createStoreFile(r);
|
||||
createStoreFile(r);
|
||||
|
@ -314,9 +316,10 @@ public class TestCompaction extends HBaseTestCase {
|
|||
assertEquals(2, s.getStorefilesCount());
|
||||
|
||||
// ensure that major compaction time is deterministic
|
||||
long mcTime = s.getNextMajorCompactTime();
|
||||
CompactionManager c = s.compactionManager;
|
||||
long mcTime = c.getNextMajorCompactTime();
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
assertEquals(mcTime, s.getNextMajorCompactTime());
|
||||
assertEquals(mcTime, c.getNextMajorCompactTime());
|
||||
}
|
||||
|
||||
// ensure that the major compaction time is within the variance
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -26,6 +25,7 @@ import java.util.GregorianCalendar;
|
|||
import java.util.List;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -39,26 +39,27 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection;
|
|||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category(SmallTests.class)
|
||||
public class TestCompactSelection extends TestCase {
|
||||
private final static Log LOG = LogFactory.getLog(TestCompactSelection.class);
|
||||
public class TestDefaultCompactSelection extends TestCase {
|
||||
private final static Log LOG = LogFactory.getLog(TestDefaultCompactSelection.class);
|
||||
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
|
||||
private Configuration conf;
|
||||
private HStore store;
|
||||
protected Configuration conf;
|
||||
protected HStore store;
|
||||
private static final String DIR=
|
||||
TEST_UTIL.getDataTestDir("TestCompactSelection").toString();
|
||||
private static Path TEST_FILE;
|
||||
private CompactionManager manager;
|
||||
|
||||
private static final int minFiles = 3;
|
||||
private static final int maxFiles = 5;
|
||||
protected static final int minFiles = 3;
|
||||
protected static final int maxFiles = 5;
|
||||
|
||||
private static final long minSize = 10;
|
||||
private static final long maxSize = 1000;
|
||||
protected static final long minSize = 10;
|
||||
protected static final long maxSize = 1000;
|
||||
|
||||
|
||||
@Override
|
||||
|
@ -94,6 +95,8 @@ public class TestCompactSelection extends TestCase {
|
|||
region = new HRegion(tableDir, hlog, fs, conf, info, htd, null);
|
||||
|
||||
store = new HStore(basedir, region, hcd, fs, conf);
|
||||
manager = store.compactionManager;
|
||||
|
||||
TEST_FILE = StoreFile.getRandomFilename(fs, store.getHomedir());
|
||||
fs.create(TEST_FILE);
|
||||
}
|
||||
|
@ -102,20 +105,41 @@ public class TestCompactSelection extends TestCase {
|
|||
static class MockStoreFile extends StoreFile {
|
||||
long length = 0;
|
||||
boolean isRef = false;
|
||||
long ageInDisk;
|
||||
long sequenceid;
|
||||
|
||||
MockStoreFile(long length, boolean isRef) throws IOException {
|
||||
super(TEST_UTIL.getTestFileSystem(), TEST_FILE,
|
||||
TEST_UTIL.getConfiguration(),
|
||||
MockStoreFile(long length, long ageInDisk, boolean isRef, long sequenceid) throws IOException {
|
||||
super(TEST_UTIL.getTestFileSystem(), TEST_FILE, TEST_UTIL.getConfiguration(),
|
||||
new CacheConfig(TEST_UTIL.getConfiguration()), BloomType.NONE,
|
||||
NoOpDataBlockEncoder.INSTANCE);
|
||||
this.length = length;
|
||||
this.isRef = isRef;
|
||||
this.isRef = isRef;
|
||||
this.ageInDisk = ageInDisk;
|
||||
this.sequenceid = sequenceid;
|
||||
}
|
||||
|
||||
void setLength(long newLen) {
|
||||
this.length = newLen;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasMinFlushTime() {
|
||||
return ageInDisk != 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMinFlushTime() {
|
||||
if (ageInDisk < 0) {
|
||||
return ageInDisk;
|
||||
}
|
||||
return EnvironmentEdgeManager.currentTimeMillis() - ageInDisk;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMaxSequenceId() {
|
||||
return sequenceid;
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean isMajorCompaction() {
|
||||
return false;
|
||||
|
@ -138,43 +162,70 @@ public class TestCompactSelection extends TestCase {
|
|||
}
|
||||
}
|
||||
|
||||
List<StoreFile> sfCreate(long ... sizes) throws IOException {
|
||||
return sfCreate(false, sizes);
|
||||
ArrayList<Long> toArrayList(long... numbers) {
|
||||
ArrayList<Long> result = new ArrayList<Long>();
|
||||
for (long i : numbers) {
|
||||
result.add(i);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
List<StoreFile> sfCreate(boolean isReference, long ... sizes)
|
||||
throws IOException {
|
||||
List<StoreFile> sfCreate(long... sizes) throws IOException {
|
||||
ArrayList<Long> ageInDisk = new ArrayList<Long>();
|
||||
for (int i = 0; i < sizes.length; i++) {
|
||||
ageInDisk.add(0L);
|
||||
}
|
||||
return sfCreate(toArrayList(sizes), ageInDisk);
|
||||
}
|
||||
|
||||
List<StoreFile> sfCreate(ArrayList<Long> sizes, ArrayList<Long> ageInDisk)
|
||||
throws IOException {
|
||||
return sfCreate(false, sizes, ageInDisk);
|
||||
}
|
||||
|
||||
List<StoreFile> sfCreate(boolean isReference, long... sizes) throws IOException {
|
||||
ArrayList<Long> ageInDisk = new ArrayList<Long>(sizes.length);
|
||||
for (int i = 0; i < sizes.length; i++) {
|
||||
ageInDisk.add(0L);
|
||||
}
|
||||
return sfCreate(isReference, toArrayList(sizes), ageInDisk);
|
||||
}
|
||||
|
||||
List<StoreFile> sfCreate(boolean isReference, ArrayList<Long> sizes, ArrayList<Long> ageInDisk)
|
||||
throws IOException {
|
||||
List<StoreFile> ret = Lists.newArrayList();
|
||||
for (long i : sizes) {
|
||||
ret.add(new MockStoreFile(i, isReference));
|
||||
for (int i = 0; i < sizes.size(); i++) {
|
||||
ret.add(new MockStoreFile(sizes.get(i), ageInDisk.get(i), isReference, i));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
long[] getSizes(List<StoreFile> sfList) {
|
||||
long[] aNums = new long[sfList.size()];
|
||||
for (int i=0; i <sfList.size(); ++i) {
|
||||
for (int i = 0; i < sfList.size(); ++i) {
|
||||
aNums[i] = sfList.get(i).getReader().length();
|
||||
}
|
||||
return aNums;
|
||||
}
|
||||
|
||||
void compactEquals(List<StoreFile> candidates, long ... expected)
|
||||
throws IOException {
|
||||
|
||||
void compactEquals(List<StoreFile> candidates, long... expected)
|
||||
throws IOException {
|
||||
compactEquals(candidates, false, expected);
|
||||
}
|
||||
|
||||
void compactEquals(List<StoreFile> candidates, boolean forcemajor,
|
||||
void compactEquals(List<StoreFile> candidates, boolean forcemajor,
|
||||
long ... expected)
|
||||
throws IOException {
|
||||
store.forceMajor = forcemajor;
|
||||
List<StoreFile> actual = store.compactSelection(candidates).getFilesToCompact();
|
||||
store.forceMajor = false;
|
||||
//Test Default compactions
|
||||
List<StoreFile> actual = store.compactionManager
|
||||
.selectCompaction(candidates, Store.NO_PRIORITY, forcemajor).getFilesToCompact();
|
||||
assertEquals(Arrays.toString(expected), Arrays.toString(getSizes(actual)));
|
||||
store.forceMajor = false;
|
||||
}
|
||||
|
||||
public void testCompactionRatio() throws IOException {
|
||||
/*
|
||||
/**
|
||||
* NOTE: these tests are specific to describe the implementation of the
|
||||
* current compaction algorithm. Developed to ensure that refactoring
|
||||
* doesn't implicitly alter this.
|
||||
|
@ -191,17 +242,15 @@ public class TestCompactSelection extends TestCase {
|
|||
compactEquals(sfCreate(tooBig, tooBig, 700,700) /* empty */);
|
||||
// small files = don't care about ratio
|
||||
compactEquals(sfCreate(8,3,1), 8,3,1);
|
||||
/* TODO: add sorting + unit test back in when HBASE-2856 is fixed
|
||||
// sort first so you don't include huge file the tail end
|
||||
/* TODO: add sorting + unit test back in when HBASE-2856 is fixed
|
||||
// sort first so you don't include huge file the tail end.
|
||||
// happens with HFileOutputFormat bulk migration
|
||||
compactEquals(sfCreate(100,50,23,12,12, 500), 23, 12, 12);
|
||||
*/
|
||||
// don't exceed max file compact threshold
|
||||
assertEquals(maxFiles,
|
||||
store.compactSelection(sfCreate(7,6,5,4,3,2,1)).getFilesToCompact().size());
|
||||
// note: file selection starts with largest to smallest.
|
||||
compactEquals(sfCreate(7, 6, 5, 4, 3, 2, 1), 7, 6, 5, 4, 3);
|
||||
|
||||
|
||||
/* MAJOR COMPACTION */
|
||||
// if a major compaction has been forced, then compact everything
|
||||
compactEquals(sfCreate(50,25,12,12), true, 50, 25, 12, 12);
|
||||
|
@ -211,15 +260,18 @@ public class TestCompactSelection extends TestCase {
|
|||
compactEquals(sfCreate(tooBig, 12,12), true, tooBig, 12, 12);
|
||||
// don't exceed max file compact threshold, even with major compaction
|
||||
store.forceMajor = true;
|
||||
assertEquals(maxFiles,
|
||||
manager.selectCompaction(sfCreate(7, 6, 5, 4, 3, 2, 1), Store.NO_PRIORITY, false)
|
||||
.getFilesToCompact().size());
|
||||
compactEquals(sfCreate(7, 6, 5, 4, 3, 2, 1), 7, 6, 5, 4, 3);
|
||||
store.forceMajor = false;
|
||||
|
||||
// if we exceed maxCompactSize, downgrade to minor
|
||||
// if not, it creates a 'snowball effect' when files >> maxCompactSize:
|
||||
// the last file in compaction is the aggregate of all previous compactions
|
||||
compactEquals(sfCreate(100,50,23,12,12), true, 23, 12, 12);
|
||||
conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 1);
|
||||
conf.setFloat("hbase.hregion.majorcompaction.jitter", 0);
|
||||
store.updateConfiguration();
|
||||
try {
|
||||
// trigger an aged major compaction
|
||||
compactEquals(sfCreate(50,25,12,12), 50, 25, 12, 12);
|
||||
|
@ -236,15 +288,12 @@ public class TestCompactSelection extends TestCase {
|
|||
// reference files shouldn't obey max threshold
|
||||
compactEquals(sfCreate(true, tooBig, 12,12), tooBig, 12, 12);
|
||||
// reference files should obey max file compact to avoid OOM
|
||||
assertEquals(maxFiles,
|
||||
store.compactSelection(sfCreate(true, 7,6,5,4,3,2,1)).getFilesToCompact().size());
|
||||
// reference compaction
|
||||
compactEquals(sfCreate(true, 7, 6, 5, 4, 3, 2, 1), 5, 4, 3, 2, 1);
|
||||
|
||||
compactEquals(sfCreate(true, 7, 6, 5, 4, 3, 2, 1), 7, 6, 5, 4, 3);
|
||||
|
||||
// empty case
|
||||
compactEquals(new ArrayList<StoreFile>() /* empty */);
|
||||
// empty case (because all files are too big)
|
||||
compactEquals(sfCreate(tooBig, tooBig) /* empty */);
|
||||
compactEquals(sfCreate(tooBig, tooBig) /* empty */);
|
||||
}
|
||||
|
||||
public void testOffPeakCompactionRatio() throws IOException {
|
||||
|
@ -258,7 +307,7 @@ public class TestCompactSelection extends TestCase {
|
|||
Calendar calendar = new GregorianCalendar();
|
||||
int hourOfDay = calendar.get(Calendar.HOUR_OF_DAY);
|
||||
LOG.debug("Hour of day = " + hourOfDay);
|
||||
int hourPlusOne = ((hourOfDay+1+24)%24);
|
||||
int hourPlusOne = ((hourOfDay+1)%24);
|
||||
int hourMinusOne = ((hourOfDay-1+24)%24);
|
||||
int hourMinusTwo = ((hourOfDay-2+24)%24);
|
||||
|
||||
|
@ -274,15 +323,16 @@ public class TestCompactSelection extends TestCase {
|
|||
this.conf.setLong("hbase.offpeak.end.hour", hourPlusOne);
|
||||
LOG.debug("Testing compact selection with off-peak settings (" +
|
||||
hourMinusOne + ", " + hourPlusOne + ")");
|
||||
compactEquals(sfCreate(999,50,12,12, 1), 50, 12, 12, 1);
|
||||
// update the compaction policy to include conf changes
|
||||
store.setCompactionPolicy(CompactionManager.class.getName());
|
||||
compactEquals(sfCreate(999, 50, 12, 12, 1), 50, 12, 12, 1);
|
||||
|
||||
// set peak hour outside current selection and check compact selection
|
||||
this.conf.setLong("hbase.offpeak.start.hour", hourMinusTwo);
|
||||
this.conf.setLong("hbase.offpeak.end.hour", hourMinusOne);
|
||||
store.setCompactionPolicy(CompactionManager.class.getName());
|
||||
LOG.debug("Testing compact selection with off-peak settings (" +
|
||||
hourMinusTwo + ", " + hourMinusOne + ")");
|
||||
compactEquals(sfCreate(999,50,12,12, 1), 12, 12, 1);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -216,17 +216,15 @@ public class TestStore extends TestCase {
|
|||
flush(i);
|
||||
}
|
||||
// after flush; check the lowest time stamp
|
||||
long lowestTimeStampFromStore =
|
||||
HStore.getLowestTimestamp(store.getStorefiles());
|
||||
long lowestTimeStampFromFS =
|
||||
getLowestTimeStampFromFS(fs,store.getStorefiles());
|
||||
assertEquals(lowestTimeStampFromStore,lowestTimeStampFromFS);
|
||||
|
||||
long lowestTimeStampFromManager = CompactionManager.getLowestTimestamp(store.getStorefiles());
|
||||
long lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
|
||||
assertEquals(lowestTimeStampFromManager,lowestTimeStampFromFS);
|
||||
|
||||
// after compact; check the lowest time stamp
|
||||
store.compact(store.requestCompaction());
|
||||
lowestTimeStampFromStore = HStore.getLowestTimestamp(store.getStorefiles());
|
||||
lowestTimeStampFromFS = getLowestTimeStampFromFS(fs,store.getStorefiles());
|
||||
assertEquals(lowestTimeStampFromStore,lowestTimeStampFromFS);
|
||||
lowestTimeStampFromManager = CompactionManager.getLowestTimestamp(store.getStorefiles());
|
||||
lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
|
||||
assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS);
|
||||
}
|
||||
|
||||
private static long getLowestTimeStampFromFS(FileSystem fs,
|
||||
|
|
|
@ -0,0 +1,318 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.*;
|
||||
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.SmallTests;
|
||||
|
||||
@Category(SmallTests.class)
|
||||
public class TestTierCompactSelection extends TestDefaultCompactSelection {
|
||||
private final static Log LOG = LogFactory.getLog(TestTierCompactSelection.class);
|
||||
|
||||
private static final int numTiers = 4;
|
||||
|
||||
private String strPrefix, strSchema, strTier;
|
||||
|
||||
|
||||
@Override
|
||||
public void setUp() throws Exception {
|
||||
|
||||
super.setUp();
|
||||
|
||||
// setup config values necessary for store
|
||||
strPrefix = "hbase.hstore.compaction.";
|
||||
strSchema = "tbl." + store.getHRegion().getTableDesc().getNameAsString()
|
||||
+ "cf." + store.getFamily().getNameAsString() + ".";
|
||||
|
||||
this.conf.setStrings(strPrefix + "CompactionPolicy", "TierBasedCompactionPolicy");
|
||||
|
||||
this.conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 0);
|
||||
|
||||
// The following parameters are for default compaction
|
||||
// Some of them are used as default values of tier based compaction
|
||||
this.conf.setInt(strPrefix + "min", 2);
|
||||
this.conf.setInt(strPrefix + "max", 10);
|
||||
this.conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 0);
|
||||
this.conf.setLong(strPrefix + "max.size", 10000);
|
||||
this.conf.setFloat(strPrefix + "ratio", 10.0F);
|
||||
|
||||
// Specifying the family parameters here
|
||||
conf.setInt(strPrefix + strSchema + "NumCompactionTiers", numTiers);
|
||||
conf.setLong(strPrefix + strSchema + "MinCompactSize", minSize);
|
||||
conf.setLong(strPrefix + strSchema + "MaxCompactSize", maxSize);
|
||||
|
||||
// Specifying parameters for the default tier
|
||||
strTier = "";
|
||||
conf.setFloat(strPrefix + strSchema + strTier + "CompactionRatio", 0.1F);
|
||||
conf.setInt(strPrefix + strSchema + strTier + "MinFilesToCompact", minFiles);
|
||||
conf.setInt(strPrefix + strSchema + strTier + "MaxFilesToCompact", maxFiles);
|
||||
|
||||
// Specifying parameters for individual tiers here
|
||||
|
||||
// Don't compact in this tier (likely to be in block cache)
|
||||
strTier = "Tier.0.";
|
||||
conf.setFloat(strPrefix + strSchema + strTier + "CompactionRatio", 0.0F);
|
||||
|
||||
// Most aggressive tier
|
||||
strTier = "Tier.1.";
|
||||
conf.setFloat(strPrefix + strSchema + strTier + "CompactionRatio", 2.0F);
|
||||
conf.setInt(strPrefix + strSchema + strTier + "MinFilesToCompact", 2);
|
||||
conf.setInt(strPrefix + strSchema + strTier + "MaxFilesToCompact", 10);
|
||||
|
||||
// Medium tier
|
||||
strTier = "Tier.2.";
|
||||
conf.setFloat(strPrefix + strSchema + strTier + "CompactionRatio", 1.0F);
|
||||
// Also include files in tier 1 here
|
||||
conf.setInt(strPrefix + strSchema + strTier + "EndingIndexForTier", 1);
|
||||
|
||||
// Last tier - least aggressive compaction
|
||||
// has default tier settings only
|
||||
// Max Time elapsed is Infinity by default
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
void compactEquals(
|
||||
List<StoreFile> candidates, boolean forcemajor,
|
||||
long... expected
|
||||
)
|
||||
throws IOException {
|
||||
store.forceMajor = forcemajor;
|
||||
//update the policy for now in case any change
|
||||
store.setCompactionPolicy(TierCompactionManager.class.getName());
|
||||
List<StoreFile> actual =
|
||||
store.compactionManager.selectCompaction(candidates, Store.NO_PRIORITY, forcemajor).getFilesToCompact();
|
||||
store.forceMajor = false;
|
||||
assertEquals(Arrays.toString(expected), Arrays.toString(getSizes(actual)));
|
||||
}
|
||||
|
||||
public void testAgeBasedAssignment() throws IOException {
|
||||
|
||||
conf.setLong(strPrefix + strSchema + "Tier.0.MaxAgeInDisk", 10L);
|
||||
conf.setLong(strPrefix + strSchema + "Tier.1.MaxAgeInDisk", 100L);
|
||||
conf.setLong(strPrefix + strSchema + "Tier.2.MaxAgeInDisk", 1000L);
|
||||
conf.setLong(strPrefix + strSchema + "Tier.0.MaxSize", Long.MAX_VALUE);
|
||||
conf.setLong(strPrefix + strSchema + "Tier.1.MaxSize", Long.MAX_VALUE);
|
||||
conf.setLong(strPrefix + strSchema + "Tier.2.MaxSize", Long.MAX_VALUE);
|
||||
|
||||
//everything in first tier, don't compact!
|
||||
compactEquals(sfCreate(toArrayList(
|
||||
151, 30, 13, 12, 11 ), toArrayList( // Sizes
|
||||
8, 5, 4, 2, 1 )) // ageInDisk ( = currentTime - minFlushTime)
|
||||
/* empty expected */ ); // Selected sizes
|
||||
|
||||
//below minSize should compact
|
||||
compactEquals(sfCreate(toArrayList(
|
||||
12, 11, 8, 3, 1 ), toArrayList(
|
||||
8, 5, 4, 2, 1 )),
|
||||
8, 3, 1 );
|
||||
|
||||
//everything in second tier
|
||||
compactEquals(sfCreate(toArrayList(
|
||||
251, 70, 13, 12, 11 ), toArrayList(
|
||||
80, 50, 40, 20, 11 )),
|
||||
70, 13, 12, 11 );
|
||||
|
||||
//everything in third tier
|
||||
compactEquals(sfCreate(toArrayList(
|
||||
251, 70, 13, 12, 11 ), toArrayList(
|
||||
800, 500, 400, 200, 110 )),
|
||||
13, 12, 11 );
|
||||
|
||||
//everything in fourth tier
|
||||
compactEquals(sfCreate(toArrayList(
|
||||
251, 70, 13, 12, 11 ), toArrayList(
|
||||
8000, 5000, 4000, 2000, 1100 ))
|
||||
/* empty expected */ );
|
||||
|
||||
//Valid compaction in 4th tier with ratio 0.10, hits maxFilesToCompact
|
||||
compactEquals(sfCreate(toArrayList(
|
||||
500, 93, 92, 91, 90, 89, 88, 87, 86, 85, 84, 83, 82, 81, 80 ), toArrayList(
|
||||
5094, 5093, 5092, 5091, 5090, 5089, 5088, 5087, 5086, 5085, 5084, 5083, 5082, 5081, 5080)),
|
||||
93, 92, 91, 90, 89 );
|
||||
|
||||
//Now mixing tiers 1,0, expected selection in tier 1 only
|
||||
compactEquals(sfCreate(toArrayList(
|
||||
999, 110, 100, 12, 11 ), toArrayList(
|
||||
90, 80, 50, 4, 1 )),
|
||||
110, 100 );
|
||||
|
||||
//Mixing tier 2,1, expected selection in tier 2 including tier 1 but not zero
|
||||
compactEquals(sfCreate(toArrayList(
|
||||
999, 110, 100, 12, 11 ), toArrayList(
|
||||
900, 800, 500, 40, 1 )),
|
||||
110, 100, 12 );
|
||||
|
||||
//Mixing tier 2,1, expected selection in tier 1 because of recentFirstOrder = true
|
||||
compactEquals(sfCreate(toArrayList(
|
||||
999, 110, 100, 12, 13, 11 ), toArrayList(
|
||||
900, 800, 500, 40, 30, 1 )),
|
||||
12, 13 );
|
||||
|
||||
conf.setBoolean(strPrefix + strSchema + "IsRecentFirstOrder", false);
|
||||
|
||||
//Mixing tier 2,1, expected selection in tier 1 because of recentFirstOrder = false
|
||||
compactEquals(sfCreate(toArrayList(
|
||||
999, 110, 100, 12, 13, 11 ), toArrayList(
|
||||
900, 800, 500, 40, 30, 1 )),
|
||||
110, 100, 12, 13 );
|
||||
|
||||
//Mixing all tier 3,2,1,0 expected selection in tier 1 only
|
||||
compactEquals(sfCreate(toArrayList(
|
||||
999, 800, 110, 100, 12, 13, 11 ), toArrayList(
|
||||
9000, 800, 50, 40, 8, 3, 1 )),
|
||||
110, 100 );
|
||||
|
||||
//Checking backward compatibility, first 3 files don't have minFlushTime,
|
||||
//all should go to tier 1, not tier 0
|
||||
compactEquals(sfCreate(toArrayList(
|
||||
999, 800, 110, 100, 12, 13, 11 ), toArrayList(
|
||||
0, 0, 0, 40, 8, 3, 1 )),
|
||||
999, 800, 110, 100 );
|
||||
|
||||
//make sure too big files don't get compacted
|
||||
compactEquals(sfCreate(toArrayList(
|
||||
1002, 1001, 999, 800, 700, 12, 13, 11 ), toArrayList(
|
||||
900, 80, 50, 40, 30, 20, 4, 2 )),
|
||||
999, 800, 700, 12 );
|
||||
|
||||
}
|
||||
|
||||
public void testSizeBasedAssignment() throws IOException {
|
||||
|
||||
conf.setLong(strPrefix + strSchema + "MinCompactSize", 3);
|
||||
|
||||
conf.setLong(strPrefix + strSchema + "Tier.0.MaxSize", 10L);
|
||||
conf.setLong(strPrefix + strSchema + "Tier.1.MaxSize", 100L);
|
||||
conf.setLong(strPrefix + strSchema + "Tier.2.MaxSize", 1000L);
|
||||
conf.setLong(strPrefix + strSchema + "Tier.0.MaxAgeInDisk", Long.MAX_VALUE);
|
||||
conf.setLong(strPrefix + strSchema + "Tier.1.MaxAgeInDisk", Long.MAX_VALUE);
|
||||
conf.setLong(strPrefix + strSchema + "Tier.2.MaxAgeInDisk", Long.MAX_VALUE);
|
||||
|
||||
compactEquals(sfCreate(false,
|
||||
500, 3, 2, 1 ),
|
||||
3, 2, 1 );
|
||||
|
||||
compactEquals(sfCreate(false,
|
||||
500, 8, 7, 6, 5, 4, 2, 1 )
|
||||
/* empty */ );
|
||||
|
||||
compactEquals(sfCreate(false,
|
||||
500, 6, 8, 4, 7, 4, 2, 1 )
|
||||
/* empty */ );
|
||||
|
||||
compactEquals(sfCreate(false,
|
||||
500, 23, 11, 8, 4, 1 )
|
||||
/* empty */ );
|
||||
|
||||
compactEquals(sfCreate(false,
|
||||
500, 11, 23, 8, 4, 1 ),
|
||||
11, 23 );
|
||||
|
||||
compactEquals(sfCreate(false,
|
||||
500, 9, 23, 8, 4, 1 ),
|
||||
9, 23 );
|
||||
|
||||
compactEquals(sfCreate(false,
|
||||
500, 70, 23, 11, 8, 4, 1 )
|
||||
/* empty */ );
|
||||
|
||||
compactEquals(sfCreate(false,
|
||||
500, 60, 23, 11, 8, 4, 1 ),
|
||||
60, 23, 11 );
|
||||
|
||||
compactEquals(sfCreate(false,
|
||||
500, 90, 60, 23, 11, 8, 4, 1 ),
|
||||
90, 60, 23, 11 );
|
||||
|
||||
conf.setBoolean(strPrefix + strSchema + "IsRecentFirstOrder", false);
|
||||
|
||||
compactEquals(sfCreate(false,
|
||||
500, 450, 60, 23, 11, 8, 4, 1 ),
|
||||
500, 450, 60, 23, 11 );
|
||||
|
||||
compactEquals(sfCreate(false,
|
||||
450, 500, 60, 23, 11, 8, 4, 1 ),
|
||||
450, 500, 60, 23, 11 );
|
||||
|
||||
compactEquals(sfCreate(false,
|
||||
1013, 1012, 1011, 1010, 1009, 1008, 1007, 1006, 1005, 1004, 1003, 1002, 1001, 999, 450, 550 ),
|
||||
999, 450, 550 );
|
||||
|
||||
conf.setLong(strPrefix + strSchema + "MaxCompactSize", 10000);
|
||||
|
||||
compactEquals(sfCreate(false,
|
||||
1013, 1012, 1011, 1010, 1009, 1008, 1007, 1006, 1005, 1004, 1003, 1002, 1001, 999, 450, 550 ),
|
||||
1013, 1012, 1011, 1010, 1009 );
|
||||
|
||||
compactEquals(sfCreate(false,
|
||||
1013, 992, 1011, 1010, 1009, 1008, 1007, 1006, 1005, 1004, 1003, 1002, 1001, 999, 450, 550),
|
||||
1013, 992, 1011, 1010, 1009 );
|
||||
|
||||
compactEquals(sfCreate(false,
|
||||
992, 993, 1011, 990, 1009, 998, 1007, 996, 1005, 994, 1003, 992, 1001, 999, 450, 550 ),
|
||||
992, 993, 1011, 990, 1009 );
|
||||
|
||||
conf.setBoolean(strPrefix + strSchema + "IsRecentFirstOrder", true);
|
||||
|
||||
compactEquals(sfCreate(false,
|
||||
500, 450, 60, 23, 11, 8, 4, 1 ),
|
||||
60, 23, 11 );
|
||||
|
||||
compactEquals(sfCreate(false,
|
||||
450, 500, 60, 23, 11, 8, 4, 1 ),
|
||||
60, 23, 11 );
|
||||
|
||||
compactEquals(sfCreate(false,
|
||||
1013, 1012, 1011, 1010, 1009, 1008, 1007, 1006, 1005, 1004, 1003, 1002, 1001, 999, 450, 550 ),
|
||||
999, 450, 550 );
|
||||
|
||||
compactEquals(sfCreate(false,
|
||||
992, 993, 1011, 990, 1009, 998, 1007, 996, 1005, 994, 1003, 992, 1001, 999, 450, 550 ),
|
||||
999, 450, 550 );
|
||||
|
||||
compactEquals(sfCreate(false,
|
||||
992, 993, 1011, 990, 1009, 998, 1007, 996, 1005, 994, 1003, 992, 991, 999, 450, 550 ),
|
||||
992, 991, 999, 450, 550 );
|
||||
|
||||
compactEquals(sfCreate(false,
|
||||
992, 993, 1011, 990, 1009, 998, 1007, 996, 1005, 994, 1003, 992, 991, 999, 450, 550, 1001),
|
||||
992, 993, 1011, 990, 1009 );
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void testCompactionRatio() throws IOException {
|
||||
conf.setInt(strPrefix + strSchema + "NumCompactionTiers", 1);
|
||||
conf.setFloat(strPrefix + strSchema + "Tier.0.CompactionRatio", 1.0F);
|
||||
conf.setInt(strPrefix + "max", 5);
|
||||
super.testCompactionRatio();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void testOffPeakCompactionRatio() throws IOException {}
|
||||
|
||||
}
|
Loading…
Reference in New Issue