HBASE-15400 Use DateTieredCompactor for Date Tiered Compaction (Clara Xiong)

This commit is contained in:
tedyu 2016-04-07 14:58:59 -07:00
parent d393603dea
commit f60fc9d1a0
16 changed files with 1104 additions and 404 deletions

View File

@ -0,0 +1,102 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactor;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User;
/**
* HBASE-15400 This store engine allows us to store data in date tiered layout with exponential
* sizing so that the more recent data has more granularity. Time-range scan will perform the
* best with most recent data. When data reach maxAge, they are compacted in fixed-size time
* windows for TTL and archiving. Please refer to design spec for more details.
* https://docs.google.com/document/d/1_AmlNb2N8Us1xICsTeGDLKIqL6T-oHoRLZ323MG_uy8/edit#heading=h.uk6y5pd3oqgx
*/
@InterfaceAudience.Private
public class DateTieredStoreEngine extends StoreEngine<DefaultStoreFlusher,
DateTieredCompactionPolicy, DateTieredCompactor, DefaultStoreFileManager> {
@Override
public boolean needsCompaction(List<StoreFile> filesCompacting) {
return compactionPolicy.needsCompaction(storeFileManager.getStorefiles(),
filesCompacting);
}
@Override
public CompactionContext createCompaction() throws IOException {
return new DateTieredCompactionContext();
}
@Override
protected void createComponents(Configuration conf, Store store, CellComparator kvComparator)
throws IOException {
this.compactionPolicy = new DateTieredCompactionPolicy(conf, store);
this.storeFileManager = new DefaultStoreFileManager(kvComparator, conf,
compactionPolicy.getConf());
this.storeFlusher = new DefaultStoreFlusher(conf, store);
this.compactor = new DateTieredCompactor(conf, store);
}
private final class DateTieredCompactionContext extends CompactionContext {
@Override
public List<StoreFile> preSelect(List<StoreFile> filesCompacting) {
return compactionPolicy.preSelectCompactionForCoprocessor(storeFileManager.getStorefiles(),
filesCompacting);
}
@Override
public boolean select(List<StoreFile> filesCompacting, boolean isUserCompaction,
boolean mayUseOffPeak, boolean forceMajor) throws IOException {
request = compactionPolicy.selectCompaction(storeFileManager.getStorefiles(), filesCompacting,
isUserCompaction, mayUseOffPeak, forceMajor);
return request != null;
}
@Override
public void forceSelect(CompactionRequest request) {
if (!(request instanceof DateTieredCompactionRequest)) {
throw new IllegalArgumentException("DateTieredCompactionRequest is expected. Actual: "
+ request.getClass().getCanonicalName());
}
super.forceSelect(request);
}
public List<Path> compact(ThroughputController throughputController, User user)
throws IOException {
if (request instanceof DateTieredCompactionRequest) {
return compactor.compact(request, ((DateTieredCompactionRequest) request).getBoundaries(),
throughputController, user);
} else {
throw new IllegalArgumentException("DateTieredCompactionRequest is expected. Actual: "
+ request.getClass().getCanonicalName());
}
}
}
}

View File

@ -1521,7 +1521,7 @@ public class HStore implements Store {
return false; return false;
} }
} }
return storeEngine.getCompactionPolicy().isMajorCompaction( return storeEngine.getCompactionPolicy().shouldPerformMajorCompaction(
this.storeEngine.getStoreFileManager().getStorefiles()); this.storeEngine.getStoreFileManager().getStorefiles());
} }

View File

@ -18,6 +18,12 @@
*/ */
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Ordering;
import java.io.DataInput; import java.io.DataInput;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@ -62,12 +68,6 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.WritableUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Ordering;
/** /**
* A Store data file. Stores usually have one or more of these files. They * A Store data file. Stores usually have one or more of these files. They
* are produced by flushing the memstore to disk. To * are produced by flushing the memstore to disk. To
@ -384,7 +384,7 @@ public class StoreFile {
* is turned off, fall back to BULKLOAD_TIME_KEY. * is turned off, fall back to BULKLOAD_TIME_KEY.
* @return true if this storefile was created by bulk load. * @return true if this storefile was created by bulk load.
*/ */
boolean isBulkLoadResult() { public boolean isBulkLoadResult() {
boolean bulkLoadedHFile = false; boolean bulkLoadedHFile = false;
String fileName = this.getPath().getName(); String fileName = this.getPath().getName();
int startPos = fileName.indexOf("SeqId_"); int startPos = fileName.indexOf("SeqId_");
@ -1777,6 +1777,19 @@ public class StoreFile {
Ordering.natural().onResultOf(new GetPathName()) Ordering.natural().onResultOf(new GetPathName())
)); ));
/**
* Comparator for time-aware compaction. SeqId is still the first
* ordering criterion to maintain MVCC.
*/
public static final Comparator<StoreFile> SEQ_ID_MAX_TIMESTAMP =
Ordering.compound(ImmutableList.of(
Ordering.natural().onResultOf(new GetSeqId()),
Ordering.natural().onResultOf(new GetMaxTimestamp()),
Ordering.natural().onResultOf(new GetFileSize()).reverse(),
Ordering.natural().onResultOf(new GetBulkTime()),
Ordering.natural().onResultOf(new GetPathName())
));
private static class GetSeqId implements Function<StoreFile, Long> { private static class GetSeqId implements Function<StoreFile, Long> {
@Override @Override
public Long apply(StoreFile sf) { public Long apply(StoreFile sf) {
@ -1811,5 +1824,12 @@ public class StoreFile {
return sf.getPath().getName(); return sf.getPath().getName();
} }
} }
private static class GetMaxTimestamp implements Function<StoreFile, Long> {
@Override
public Long apply(StoreFile sf) {
return sf.getMaximumTimestamp() == null? (Long)Long.MAX_VALUE : sf.getMaximumTimestamp();
}
}
} }
} }

View File

@ -80,6 +80,8 @@ public class CompactionConfiguration {
"hbase.hstore.compaction.date.tiered.incoming.window.min"; "hbase.hstore.compaction.date.tiered.incoming.window.min";
public static final String COMPACTION_POLICY_CLASS_FOR_TIERED_WINDOWS_KEY = public static final String COMPACTION_POLICY_CLASS_FOR_TIERED_WINDOWS_KEY =
"hbase.hstore.compaction.date.tiered.window.policy.class"; "hbase.hstore.compaction.date.tiered.window.policy.class";
public static final String SINGLE_OUTPUT_FOR_MINOR_COMPACTION_KEY =
"hbase.hstore.compaction.date.tiered.single.output.for.minor.compaction";
private static final Class<? extends RatioBasedCompactionPolicy> private static final Class<? extends RatioBasedCompactionPolicy>
DEFAULT_TIER_COMPACTION_POLICY_CLASS = ExploringCompactionPolicy.class; DEFAULT_TIER_COMPACTION_POLICY_CLASS = ExploringCompactionPolicy.class;
@ -105,6 +107,7 @@ public class CompactionConfiguration {
private final int windowsPerTier; private final int windowsPerTier;
private final int incomingWindowMin; private final int incomingWindowMin;
private final String compactionPolicyForTieredWindow; private final String compactionPolicyForTieredWindow;
private final boolean singleOutputForMinorCompaction;
CompactionConfiguration(Configuration conf, StoreConfigInformation storeConfigInfo) { CompactionConfiguration(Configuration conf, StoreConfigInformation storeConfigInfo) {
this.conf = conf; this.conf = conf;
@ -134,6 +137,9 @@ public class CompactionConfiguration {
incomingWindowMin = conf.getInt(INCOMING_WINDOW_MIN_KEY, 6); incomingWindowMin = conf.getInt(INCOMING_WINDOW_MIN_KEY, 6);
compactionPolicyForTieredWindow = conf.get(COMPACTION_POLICY_CLASS_FOR_TIERED_WINDOWS_KEY, compactionPolicyForTieredWindow = conf.get(COMPACTION_POLICY_CLASS_FOR_TIERED_WINDOWS_KEY,
DEFAULT_TIER_COMPACTION_POLICY_CLASS.getName()); DEFAULT_TIER_COMPACTION_POLICY_CLASS.getName());
singleOutputForMinorCompaction = conf.getBoolean(SINGLE_OUTPUT_FOR_MINOR_COMPACTION_KEY,
true);
LOG.info(this); LOG.info(this);
} }
@ -274,4 +280,8 @@ public class CompactionConfiguration {
public String getCompactionPolicyForTieredWindow() { public String getCompactionPolicyForTieredWindow() {
return compactionPolicyForTieredWindow; return compactionPolicyForTieredWindow;
} }
public boolean useSingleOutputForMinorCompaction() {
return singleOutputForMinorCompaction;
}
} }

View File

@ -45,7 +45,7 @@ public abstract class CompactionPolicy {
* @param filesToCompact Files to compact. Can be null. * @param filesToCompact Files to compact. Can be null.
* @return True if we should run a major compaction. * @return True if we should run a major compaction.
*/ */
public abstract boolean isMajorCompaction( public abstract boolean shouldPerformMajorCompaction(
final Collection<StoreFile> filesToCompact) throws IOException; final Collection<StoreFile> filesToCompact) throws IOException;
/** /**

View File

@ -18,6 +18,12 @@
*/ */
package org.apache.hadoop.hbase.regionserver.compactions; package org.apache.hadoop.hbase.regionserver.compactions;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
@ -31,12 +37,6 @@ import org.apache.hadoop.hbase.regionserver.StoreFile.Reader;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
/** /**
* This class holds all logical details necessary to run a compaction. * This class holds all logical details necessary to run a compaction.
*/ */
@ -76,6 +76,10 @@ public class CompactionRequest implements Comparable<CompactionRequest> {
recalculateSize(); recalculateSize();
} }
public void updateFiles(Collection<StoreFile> files) {
this.filesToCompact = files;
}
/** /**
* Called before compaction is executed by CompactSplitThread; for use by coproc subclasses. * Called before compaction is executed by CompactSplitThread; for use by coproc subclasses.
*/ */

View File

@ -1,4 +1,5 @@
/** /**
*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -19,11 +20,11 @@ package org.apache.hadoop.hbase.regionserver.compactions;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate; import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators; import com.google.common.collect.Iterators;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.PeekingIterator; import com.google.common.collect.PeekingIterator;
import com.google.common.math.LongMath;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
@ -35,9 +36,13 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.regionserver.StoreConfigInformation; import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hbase.util.ReflectionUtils;
@ -50,14 +55,13 @@ import org.apache.hadoop.hbase.util.ReflectionUtils;
* 3. Improve TTL efficiency. * 3. Improve TTL efficiency.
* Perfect fit for the use cases that: * Perfect fit for the use cases that:
* 1. has mostly date-based data write and scan and a focus on the most recent data. * 1. has mostly date-based data write and scan and a focus on the most recent data.
* 2. never or rarely deletes data. Out-of-order writes are handled gracefully. Time range * Out-of-order writes are handled gracefully. Time range overlapping among store files is
* overlapping among store files is tolerated and the performance impact is minimized. Configuration * tolerated and the performance impact is minimized. Configuration can be set at hbase-site
* can be set at hbase-site or overriden at per-table or per-column-famly level by hbase shell. * or overridden at per-table or per-column-family level by hbase shell. Design spec is at
* Design spec is at
* https://docs.google.com/document/d/1_AmlNb2N8Us1xICsTeGDLKIqL6T-oHoRLZ323MG_uy8/ * https://docs.google.com/document/d/1_AmlNb2N8Us1xICsTeGDLKIqL6T-oHoRLZ323MG_uy8/
*/ */
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class DateTieredCompactionPolicy extends RatioBasedCompactionPolicy { public class DateTieredCompactionPolicy extends SortedCompactionPolicy {
private static final Log LOG = LogFactory.getLog(DateTieredCompactionPolicy.class); private static final Log LOG = LogFactory.getLog(DateTieredCompactionPolicy.class);
private RatioBasedCompactionPolicy compactionPolicyPerWindow; private RatioBasedCompactionPolicy compactionPolicyPerWindow;
@ -67,111 +71,112 @@ public class DateTieredCompactionPolicy extends RatioBasedCompactionPolicy {
super(conf, storeConfigInfo); super(conf, storeConfigInfo);
try { try {
compactionPolicyPerWindow = compactionPolicyPerWindow =
ReflectionUtils.instantiateWithCustomCtor(comConf.getCompactionPolicyForTieredWindow(), ReflectionUtils.instantiateWithCustomCtor(comConf.getCompactionPolicyForTieredWindow(),
new Class[] { Configuration.class, StoreConfigInformation.class }, new Object[] { conf, new Class[] { Configuration.class, StoreConfigInformation.class }, new Object[] { conf,
storeConfigInfo }); storeConfigInfo });
} catch (Exception e) { } catch (Exception e) {
throw new IOException("Unable to load configured compaction policy '" throw new IOException("Unable to load configured compaction policy '"
+ comConf.getCompactionPolicyForTieredWindow() + "'", e); + comConf.getCompactionPolicyForTieredWindow() + "'", e);
} }
} }
@Override
public boolean isMajorCompaction(Collection<StoreFile> filesToCompact) throws IOException {
// Never do major compaction unless forced
return false;
}
@Override
/** /**
* Heuristics for guessing whether we need compaction. * Heuristics for guessing whether we need minor compaction.
*/ */
public boolean needsCompaction(final Collection<StoreFile> storeFiles, @Override
final List<StoreFile> filesCompacting) {
return needsCompaction(storeFiles, filesCompacting, EnvironmentEdgeManager.currentTime());
}
@VisibleForTesting @VisibleForTesting
public boolean needsCompaction(final Collection<StoreFile> storeFiles, public boolean needsCompaction(final Collection<StoreFile> storeFiles,
final List<StoreFile> filesCompacting, long now) { final List<StoreFile> filesCompacting) {
if (!super.needsCompaction(storeFiles, filesCompacting)) {
return false;
}
ArrayList<StoreFile> candidates = new ArrayList<StoreFile>(storeFiles); ArrayList<StoreFile> candidates = new ArrayList<StoreFile>(storeFiles);
candidates = filterBulk(candidates);
candidates = skipLargeFiles(candidates, true);
try { try {
candidates = applyCompactionPolicy(candidates, true, false, now); return selectMinorCompaction(candidates, false, true) != null;
} catch (Exception e) { } catch (Exception e) {
LOG.error("Can not check for compaction: ", e); LOG.error("Can not check for compaction: ", e);
return false; return false;
} }
return candidates != null && candidates.size() >= comConf.getMinFilesToCompact();
} }
/** public boolean shouldPerformMajorCompaction(final Collection<StoreFile> filesToCompact)
* Could return null if no candidates are found throws IOException {
*/ long mcTime = getNextMajorCompactTime(filesToCompact);
@Override if (filesToCompact == null || mcTime == 0) {
public ArrayList<StoreFile> applyCompactionPolicy(ArrayList<StoreFile> candidates, return false;
boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
return applyCompactionPolicy(candidates, mayUseOffPeak, mayBeStuck,
EnvironmentEdgeManager.currentTime());
}
/**
* Input candidates are sorted from oldest to newest by seqId. Could return null if no candidates
* are found.
*/
@VisibleForTesting
public ArrayList<StoreFile> applyCompactionPolicy(ArrayList<StoreFile> candidates,
boolean mayUseOffPeak, boolean mayBeStuck, long now) throws IOException {
Iterable<StoreFile> candidatesInWindow =
filterOldStoreFiles(Lists.newArrayList(candidates), comConf.getMaxStoreFileAgeMillis(), now);
List<ArrayList<StoreFile>> buckets =
partitionFilesToBuckets(candidatesInWindow, comConf.getBaseWindowMillis(),
comConf.getWindowsPerTier(), now);
LOG.debug("Compaction buckets are: " + buckets);
if (buckets.size() >= storeConfigInfo.getBlockingFileCount()) {
LOG.warn("Number of compaction buckets:" + buckets.size()
+ ", exceeds blocking file count setting: "
+ storeConfigInfo.getBlockingFileCount()
+ ", either increase hbase.hstore.blockingStoreFiles or "
+ "reduce the number of tiered compaction windows");
} }
return newestBucket(buckets, comConf.getIncomingWindowMin(), now, // TODO: Use better method for determining stamp of last major (HBASE-2990)
comConf.getBaseWindowMillis(), mayUseOffPeak); long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact);
} long now = EnvironmentEdgeManager.currentTime();
if (lowTimestamp <= 0L || lowTimestamp >= (now - mcTime)) {
return false;
}
/** long cfTTL = this.storeConfigInfo.getStoreFileTtl();
* @param buckets the list of buckets, sorted from newest to oldest, from which to return the HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
* newest bucket within thresholds. long oldestToCompact = getOldestToCompact(comConf.getMaxStoreFileAgeMillis(), now);
* @param incomingWindowThreshold minimum number of storeFiles in a bucket to qualify. List<Long> boundaries = getCompactBoundariesForMajor(filesToCompact, oldestToCompact, now);
* @param maxThreshold maximum number of storeFiles to compact at once (the returned bucket will boolean[] filesInWindow = new boolean[boundaries.size()];
* be trimmed down to this).
* @return a bucket (a list of store files within a window to be compacted). for (StoreFile file: filesToCompact) {
* @throws IOException error Long minTimestamp = file.getMinimumTimestamp();
*/ long oldest = (minTimestamp == null) ? (Long)Long.MIN_VALUE : now - minTimestamp.longValue();
private ArrayList<StoreFile> newestBucket(List<ArrayList<StoreFile>> buckets, if (cfTTL != HConstants.FOREVER && oldest >= cfTTL) {
int incomingWindowThreshold, long now, long baseWindowMillis, boolean mayUseOffPeak) LOG.debug("Major compaction triggered on store " + this
throws IOException { + "; for TTL maintenance");
Window incomingWindow = getInitialWindow(now, baseWindowMillis); return true;
for (ArrayList<StoreFile> bucket : buckets) {
int minThreshold =
incomingWindow.compareToTimestamp(bucket.get(0).getMaximumTimestamp()) <= 0 ? comConf
.getIncomingWindowMin() : comConf.getMinFilesToCompact();
compactionPolicyPerWindow.setMinThreshold(minThreshold);
ArrayList<StoreFile> candidates =
compactionPolicyPerWindow.applyCompactionPolicy(bucket, mayUseOffPeak, false);
if (candidates != null && !candidates.isEmpty()) {
return candidates;
} }
if (!file.isMajorCompaction() || file.isBulkLoadResult()) {
LOG.debug("Major compaction triggered on store " + this
+ ", because there are new files and time since last major compaction "
+ (now - lowTimestamp) + "ms");
return true;
}
int lowerWindowIndex = Collections.binarySearch(boundaries,
minTimestamp == null ? (Long)Long.MAX_VALUE : minTimestamp);
int upperWindowIndex = Collections.binarySearch(boundaries,
file.getMaximumTimestamp() == null ? (Long)Long.MAX_VALUE : file.getMaximumTimestamp());
if (lowerWindowIndex != upperWindowIndex) {
LOG.debug("Major compaction triggered on store " + this + "; because file "
+ file.getPath() + " has data with timestamps cross window boundaries");
return true;
} else if (filesInWindow[upperWindowIndex]) {
LOG.debug("Major compaction triggered on store " + this +
"; because there are more than one file in some windows");
return true;
} else {
filesInWindow[upperWindowIndex] = true;
}
hdfsBlocksDistribution.add(file.getHDFSBlockDistribution());
} }
return null;
float blockLocalityIndex = hdfsBlocksDistribution
.getBlockLocalityIndex(RSRpcServices.getHostname(comConf.conf, false));
if (blockLocalityIndex < comConf.getMinLocalityToForceCompact()) {
LOG.debug("Major compaction triggered on store " + this
+ "; to make hdfs blocks local, current blockLocalityIndex is "
+ blockLocalityIndex + " (min " + comConf.getMinLocalityToForceCompact() + ")");
return true;
}
LOG.debug("Skipping major compaction of " + this +
", because the files are already major compacted");
return false;
}
@Override
protected CompactionRequest createCompactionRequest(ArrayList<StoreFile> candidateSelection,
boolean tryingMajor, boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
CompactionRequest result = tryingMajor ? selectMajorCompaction(candidateSelection)
: selectMinorCompaction(candidateSelection, mayUseOffPeak, mayBeStuck);
LOG.debug("Generated compaction request: " + result);
return result;
}
public CompactionRequest selectMajorCompaction(ArrayList<StoreFile> candidateSelection) {
long now = EnvironmentEdgeManager.currentTime();
long oldestToCompact = getOldestToCompact(comConf.getMaxStoreFileAgeMillis(), now);
return new DateTieredCompactionRequest(candidateSelection,
this.getCompactBoundariesForMajor(candidateSelection, oldestToCompact, now));
} }
/** /**
@ -179,63 +184,134 @@ public class DateTieredCompactionPolicy extends RatioBasedCompactionPolicy {
* current file has a maxTimestamp older than last known maximum, treat this file as it carries * current file has a maxTimestamp older than last known maximum, treat this file as it carries
* the last known maximum. This way both seqId and timestamp are in the same order. If files carry * the last known maximum. This way both seqId and timestamp are in the same order. If files carry
* the same maxTimestamps, they are ordered by seqId. We then reverse the list so they are ordered * the same maxTimestamps, they are ordered by seqId. We then reverse the list so they are ordered
* by seqId and maxTimestamp in decending order and build the time windows. All the out-of-order * by seqId and maxTimestamp in descending order and build the time windows. All the out-of-order
* data into the same compaction windows, guaranteeing contiguous compaction based on sequence id. * data into the same compaction windows, guaranteeing contiguous compaction based on sequence id.
*/ */
private static List<ArrayList<StoreFile>> partitionFilesToBuckets(Iterable<StoreFile> storeFiles, public CompactionRequest selectMinorCompaction(ArrayList<StoreFile> candidateSelection,
long baseWindowSizeMillis, int windowsPerTier, long now) { boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
List<ArrayList<StoreFile>> buckets = Lists.newArrayList(); long now = EnvironmentEdgeManager.currentTime();
Window window = getInitialWindow(now, baseWindowSizeMillis); long oldestToCompact = getOldestToCompact(comConf.getMaxStoreFileAgeMillis(), now);
// Make sure the store files is sorted by SeqId then maxTimestamp
List<StoreFile> storeFileList = Lists.newArrayList(filterOldStoreFiles(candidateSelection,
oldestToCompact));
Collections.sort(storeFileList, StoreFile.Comparators.SEQ_ID_MAX_TIMESTAMP);
List<Pair<StoreFile, Long>> storefileMaxTimestampPairs = List<Pair<StoreFile, Long>> storefileMaxTimestampPairs =
Lists.newArrayListWithCapacity(Iterables.size(storeFiles)); Lists.newArrayListWithCapacity(Iterables.size(storeFileList));
long maxTimestampSeen = Long.MIN_VALUE; long maxTimestampSeen = Long.MIN_VALUE;
for (StoreFile storeFile : storeFiles) { for (StoreFile storeFile : storeFileList) {
// if there is out-of-order data, // if there is out-of-order data,
// we put them in the same window as the last file in increasing order // we put them in the same window as the last file in increasing order
maxTimestampSeen = Math.max(maxTimestampSeen, storeFile.getMaximumTimestamp()); maxTimestampSeen = Math.max(maxTimestampSeen,
storeFile.getMaximumTimestamp() == null? Long.MIN_VALUE : storeFile.getMaximumTimestamp());
storefileMaxTimestampPairs.add(new Pair<StoreFile, Long>(storeFile, maxTimestampSeen)); storefileMaxTimestampPairs.add(new Pair<StoreFile, Long>(storeFile, maxTimestampSeen));
} }
Collections.reverse(storefileMaxTimestampPairs); Collections.reverse(storefileMaxTimestampPairs);
Window window = getIncomingWindow(now, comConf.getBaseWindowMillis());
int minThreshold = comConf.getIncomingWindowMin();
PeekingIterator<Pair<StoreFile, Long>> it = PeekingIterator<Pair<StoreFile, Long>> it =
Iterators.peekingIterator(storefileMaxTimestampPairs.iterator()); Iterators.peekingIterator(storefileMaxTimestampPairs.iterator());
while (it.hasNext()) { while (it.hasNext()) {
int compResult = window.compareToTimestamp(it.peek().getSecond()); int compResult = window.compareToTimestamp(it.peek().getSecond());
if (compResult > 0) { if (compResult > 0) {
// If the file is too old for the window, switch to the next window // If the file is too old for the window, switch to the next window
window = window.nextWindow(windowsPerTier); window = window.nextWindow(comConf.getWindowsPerTier(),
oldestToCompact);
minThreshold = comConf.getMinFilesToCompact();
} else { } else {
// The file is within the target window // The file is within the target window
ArrayList<StoreFile> bucket = Lists.newArrayList(); ArrayList<StoreFile> fileList = Lists.newArrayList();
// Add all files in the same window to current bucket. For incoming window // Add all files in the same window. For incoming window
// we tolerate files with future data although it is sub-optimal // we tolerate files with future data although it is sub-optimal
while (it.hasNext() && window.compareToTimestamp(it.peek().getSecond()) <= 0) { while (it.hasNext() && window.compareToTimestamp(it.peek().getSecond()) <= 0) {
bucket.add(it.next().getFirst()); fileList.add(it.next().getFirst());
} }
if (!bucket.isEmpty()) { if (fileList.size() >= minThreshold) {
buckets.add(bucket); LOG.debug("Processing files: " + fileList + " for window: " + window);
DateTieredCompactionRequest request = generateCompactionRequest(fileList, window,
mayUseOffPeak, mayBeStuck, minThreshold);
if (request != null) {
return request;
}
} }
} }
} }
// A non-null file list is expected by HStore
return new CompactionRequest(Collections.<StoreFile> emptyList());
}
return buckets; private DateTieredCompactionRequest generateCompactionRequest(ArrayList<StoreFile> storeFiles,
Window window, boolean mayUseOffPeak, boolean mayBeStuck, int minThreshold)
throws IOException {
// The files has to be in ascending order for ratio-based compaction to work right
// and removeExcessFile to exclude youngest files.
Collections.reverse(storeFiles);
// Compact everything in the window if have more files than comConf.maxBlockingFiles
compactionPolicyPerWindow.setMinThreshold(minThreshold);
ArrayList<StoreFile> storeFileSelection = mayBeStuck ? storeFiles
: compactionPolicyPerWindow.applyCompactionPolicy(storeFiles, mayUseOffPeak, false);
if (storeFileSelection != null && !storeFileSelection.isEmpty()) {
// If there is any file in the window excluded from compaction,
// only one file will be output from compaction.
boolean singleOutput = storeFiles.size() != storeFileSelection.size() ||
comConf.useSingleOutputForMinorCompaction();
List<Long> boundaries = getCompactionBoundariesForMinor(window, singleOutput);
DateTieredCompactionRequest result = new DateTieredCompactionRequest(storeFileSelection,
boundaries);
return result;
}
return null;
}
/**
* Return a list of boundaries for multiple compaction output
* in ascending order.
*/
private List<Long> getCompactBoundariesForMajor(Collection<StoreFile> filesToCompact,
long oldestToCompact, long now) {
long minTimestamp = Long.MAX_VALUE;
for (StoreFile file : filesToCompact) {
minTimestamp = Math.min(minTimestamp,
file.getMinimumTimestamp() == null? Long.MAX_VALUE : file.getMinimumTimestamp());
}
List<Long> boundaries = new ArrayList<Long>();
// Add startMillis of all windows between now and min timestamp
for (Window window = getIncomingWindow(now, comConf.getBaseWindowMillis());
window.compareToTimestamp(minTimestamp) > 0;
window = window.nextWindow(comConf.getWindowsPerTier(), oldestToCompact)) {
boundaries.add(window.startMillis());
}
boundaries.add(Long.MIN_VALUE);
Collections.reverse(boundaries);
return boundaries;
}
/**
* @return a list of boundaries for multiple compaction output
* from minTimestamp to maxTimestamp.
*/
private static List<Long> getCompactionBoundariesForMinor(Window window, boolean singleOutput) {
List<Long> boundaries = new ArrayList<Long>();
boundaries.add(Long.MIN_VALUE);
if (!singleOutput) {
boundaries.add(window.startMillis());
}
return boundaries;
} }
/** /**
* Removes all store files with max timestamp older than (current - maxAge). * Removes all store files with max timestamp older than (current - maxAge).
* @param storeFiles all store files to consider * @param storeFiles all store files to consider
* @param maxAge the age in milliseconds when a store file stops participating in compaction. * @param maxAge the age in milliseconds when a store file stops participating in compaction.
* @param now current time. store files with max timestamp less than (now - maxAge) are filtered.
* @return a list of storeFiles with the store file older than maxAge excluded * @return a list of storeFiles with the store file older than maxAge excluded
*/ */
private static Iterable<StoreFile> filterOldStoreFiles(List<StoreFile> storeFiles, long maxAge, private static Iterable<StoreFile> filterOldStoreFiles(List<StoreFile> storeFiles,
long now) { final long cutoff) {
if (maxAge == 0) {
return ImmutableList.of();
}
final long cutoff = now - maxAge;
return Iterables.filter(storeFiles, new Predicate<StoreFile>() { return Iterables.filter(storeFiles, new Predicate<StoreFile>() {
@Override @Override
public boolean apply(StoreFile storeFile) { public boolean apply(StoreFile storeFile) {
@ -243,13 +319,24 @@ public class DateTieredCompactionPolicy extends RatioBasedCompactionPolicy {
if (storeFile == null) { if (storeFile == null) {
return false; return false;
} }
return storeFile.getMaximumTimestamp() >= cutoff; Long maxTimestamp = storeFile.getMaximumTimestamp();
return maxTimestamp == null ? true : maxTimestamp >= cutoff;
} }
}); });
} }
private static Window getInitialWindow(long now, long timeUnit) { private static Window getIncomingWindow(long now, long baseWindowMillis) {
return new Window(timeUnit, now / timeUnit); return new Window(baseWindowMillis, now / baseWindowMillis);
}
private static long getOldestToCompact(long maxAgeMillis, long now) {
try {
return LongMath.checkedSubtract(now, maxAgeMillis);
} catch (ArithmeticException ae) {
LOG.warn("Value for " + CompactionConfiguration.MAX_AGE_MILLIS_KEY + ": " + maxAgeMillis
+ ". All the files will be eligible for minor compaction.");
return Long.MIN_VALUE;
}
} }
/** /**
@ -268,7 +355,7 @@ public class DateTieredCompactionPolicy extends RatioBasedCompactionPolicy {
private final long divPosition; private final long divPosition;
private Window(long baseWindowMillis, long divPosition) { private Window(long baseWindowMillis, long divPosition) {
this.windowMillis = baseWindowMillis; windowMillis = baseWindowMillis;
this.divPosition = divPosition; this.divPosition = divPosition;
} }
@ -279,6 +366,13 @@ public class DateTieredCompactionPolicy extends RatioBasedCompactionPolicy {
* or after than the timestamp. * or after than the timestamp.
*/ */
public int compareToTimestamp(long timestamp) { public int compareToTimestamp(long timestamp) {
if (timestamp < 0) {
try {
timestamp = LongMath.checkedSubtract(timestamp, windowMillis - 1);
} catch (ArithmeticException ae) {
timestamp = Long.MIN_VALUE;
}
}
long pos = timestamp / windowMillis; long pos = timestamp / windowMillis;
return divPosition == pos ? 0 : divPosition < pos ? -1 : 1; return divPosition == pos ? 0 : divPosition < pos ? -1 : 1;
} }
@ -290,12 +384,42 @@ public class DateTieredCompactionPolicy extends RatioBasedCompactionPolicy {
* following those will be <code>tierBase</code> times as big. * following those will be <code>tierBase</code> times as big.
* @return The next window * @return The next window
*/ */
public Window nextWindow(int windowsPerTier) { public Window nextWindow(int windowsPerTier, long oldestToCompact) {
if (divPosition % windowsPerTier > 0) { // Don't promote to the next tier if there is not even 1 window at current tier
// or if the next window crosses the max age.
if (divPosition % windowsPerTier > 0 ||
startMillis() - windowMillis * windowsPerTier < oldestToCompact) {
return new Window(windowMillis, divPosition - 1); return new Window(windowMillis, divPosition - 1);
} else { } else {
return new Window(windowMillis * windowsPerTier, divPosition / windowsPerTier - 1); return new Window(windowMillis * windowsPerTier, divPosition / windowsPerTier - 1);
} }
} }
/**
* Inclusive lower bound
*/
public long startMillis() {
try {
return LongMath.checkedMultiply(windowMillis, divPosition);
} catch (ArithmeticException ae) {
return Long.MIN_VALUE;
}
}
/**
* Exclusive upper bound
*/
public long endMillis() {
try {
return LongMath.checkedMultiply(windowMillis, (divPosition + 1));
} catch (ArithmeticException ae) {
return Long.MAX_VALUE;
}
}
@Override
public String toString() {
return "[" + startMillis() + ", " + endMillis() + ")";
}
} }
} }

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.compactions;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import org.apache.hadoop.hbase.regionserver.StoreFile;
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_DOESNT_OVERRIDE_EQUALS",
justification="It is intended to use the same equal method as superclass")
public class DateTieredCompactionRequest extends CompactionRequest {
private List<Long> boundaries;
public DateTieredCompactionRequest(Collection<StoreFile> files, List<Long> boundaryList) {
super(files);
boundaries = boundaryList;
}
public List<Long> getBoundaries() {
return boundaries;
}
@Override
public String toString() {
return super.toString() + " boundaries=" + Arrays.toString(boundaries.toArray());
}
}

View File

@ -25,8 +25,8 @@ import java.util.List;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.StoreConfigInformation; import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFile;
@ -51,7 +51,7 @@ public class ExploringCompactionPolicy extends RatioBasedCompactionPolicy {
} }
@Override @Override
final ArrayList<StoreFile> applyCompactionPolicy(final ArrayList<StoreFile> candidates, protected final ArrayList<StoreFile> applyCompactionPolicy(final ArrayList<StoreFile> candidates,
final boolean mayUseOffPeak, final boolean mightBeStuck) throws IOException { final boolean mayUseOffPeak, final boolean mightBeStuck) throws IOException {
return new ArrayList<StoreFile>(applyCompactionPolicy(candidates, mightBeStuck, return new ArrayList<StoreFile>(applyCompactionPolicy(candidates, mightBeStuck,
mayUseOffPeak, comConf.getMinFilesToCompact(), comConf.getMaxFilesToCompact())); mayUseOffPeak, comConf.getMinFilesToCompact(), comConf.getMaxFilesToCompact()));

View File

@ -76,11 +76,12 @@ public class FIFOCompactionPolicy extends ExploringCompactionPolicy {
} }
@Override @Override
public boolean isMajorCompaction(Collection<StoreFile> filesToCompact) throws IOException { public boolean shouldPerformMajorCompaction(Collection<StoreFile> filesToCompact)
throws IOException {
boolean isAfterSplit = StoreUtils.hasReferences(filesToCompact); boolean isAfterSplit = StoreUtils.hasReferences(filesToCompact);
if(isAfterSplit){ if(isAfterSplit){
LOG.info("Split detected, delegate to the parent policy."); LOG.info("Split detected, delegate to the parent policy.");
return super.isMajorCompaction(filesToCompact); return super.shouldPerformMajorCompaction(filesToCompact);
} }
return false; return false;
} }

View File

@ -16,14 +16,12 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hbase.regionserver.compactions; package org.apache.hadoop.hbase.regionserver.compactions;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Random;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -35,17 +33,13 @@ import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreUtils; import org.apache.hadoop.hbase.regionserver.StoreUtils;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
/** /**
* The default algorithm for selecting files for compaction. * The default algorithm for selecting files for compaction.
* Combines the compaction configuration and the provisional file selection that * Combines the compaction configuration and the provisional file selection that
* it's given to produce the list of suitable candidates for compaction. * it's given to produce the list of suitable candidates for compaction.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class RatioBasedCompactionPolicy extends CompactionPolicy { public class RatioBasedCompactionPolicy extends SortedCompactionPolicy {
private static final Log LOG = LogFactory.getLog(RatioBasedCompactionPolicy.class); private static final Log LOG = LogFactory.getLog(RatioBasedCompactionPolicy.class);
public RatioBasedCompactionPolicy(Configuration conf, public RatioBasedCompactionPolicy(Configuration conf,
@ -53,154 +47,73 @@ public class RatioBasedCompactionPolicy extends CompactionPolicy {
super(conf, storeConfigInfo); super(conf, storeConfigInfo);
} }
private ArrayList<StoreFile> getCurrentEligibleFiles( /*
ArrayList<StoreFile> candidateFiles, final List<StoreFile> filesCompacting) { * @param filesToCompact Files to compact. Can be null.
// candidates = all storefiles not already in compaction queue * @return True if we should run a major compaction.
if (!filesCompacting.isEmpty()) {
// exclude all files older than the newest file we're currently
// compacting. this allows us to preserve contiguity (HBASE-2856)
StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
int idx = candidateFiles.indexOf(last);
Preconditions.checkArgument(idx != -1);
candidateFiles.subList(0, idx + 1).clear();
}
return candidateFiles;
}
public List<StoreFile> preSelectCompactionForCoprocessor(
final Collection<StoreFile> candidates, final List<StoreFile> filesCompacting) {
return getCurrentEligibleFiles(new ArrayList<StoreFile>(candidates), filesCompacting);
}
/**
* @param candidateFiles candidate files, ordered from oldest to newest by seqId. We rely on
* DefaultStoreFileManager to sort the files by seqId to guarantee contiguous compaction based
* on seqId for data consistency.
* @return subset copy of candidate list that meets compaction criteria
* @throws java.io.IOException
*/ */
public CompactionRequest selectCompaction(Collection<StoreFile> candidateFiles, @Override
final List<StoreFile> filesCompacting, final boolean isUserCompaction, public boolean shouldPerformMajorCompaction(final Collection<StoreFile> filesToCompact)
final boolean mayUseOffPeak, final boolean forceMajor) throws IOException { throws IOException {
// Preliminary compaction subject to filters boolean result = false;
ArrayList<StoreFile> candidateSelection = new ArrayList<StoreFile>(candidateFiles); long mcTime = getNextMajorCompactTime(filesToCompact);
// Stuck and not compacting enough (estimate). It is not guaranteed that we will be if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) {
// able to compact more if stuck and compacting, because ratio policy excludes some return result;
// non-compacting files from consideration during compaction (see getCurrentEligibleFiles).
int futureFiles = filesCompacting.isEmpty() ? 0 : 1;
boolean mayBeStuck = (candidateFiles.size() - filesCompacting.size() + futureFiles)
>= storeConfigInfo.getBlockingFileCount();
candidateSelection = getCurrentEligibleFiles(candidateSelection, filesCompacting);
LOG.debug("Selecting compaction from " + candidateFiles.size() + " store files, " +
filesCompacting.size() + " compacting, " + candidateSelection.size() +
" eligible, " + storeConfigInfo.getBlockingFileCount() + " blocking");
// If we can't have all files, we cannot do major anyway
boolean isAllFiles = candidateFiles.size() == candidateSelection.size();
if (!(forceMajor && isAllFiles)) {
candidateSelection = skipLargeFiles(candidateSelection, mayUseOffPeak);
isAllFiles = candidateFiles.size() == candidateSelection.size();
} }
// TODO: Use better method for determining stamp of last major (HBASE-2990)
// Try a major compaction if this is a user-requested major compaction, long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact);
// or if we do not have too many files to compact and this was requested as a major compaction long now = System.currentTimeMillis();
boolean isTryingMajor = (forceMajor && isAllFiles && isUserCompaction) if (lowTimestamp > 0L && lowTimestamp < (now - mcTime)) {
|| (((forceMajor && isAllFiles) || isMajorCompaction(candidateSelection)) // Major compaction time has elapsed.
&& (candidateSelection.size() < comConf.getMaxFilesToCompact())); long cfTTL = this.storeConfigInfo.getStoreFileTtl();
// Or, if there are any references among the candidates. if (filesToCompact.size() == 1) {
boolean isAfterSplit = StoreUtils.hasReferences(candidateSelection); // Single file
if (!isTryingMajor && !isAfterSplit) { StoreFile sf = filesToCompact.iterator().next();
// We're are not compacting all files, let's see what files are applicable Long minTimestamp = sf.getMinimumTimestamp();
candidateSelection = filterBulk(candidateSelection); long oldest = (minTimestamp == null) ? Long.MIN_VALUE : now - minTimestamp.longValue();
candidateSelection = applyCompactionPolicy(candidateSelection, mayUseOffPeak, mayBeStuck); if (sf.isMajorCompaction() && (cfTTL == HConstants.FOREVER || oldest < cfTTL)) {
candidateSelection = checkMinFilesCriteria(candidateSelection); float blockLocalityIndex =
sf.getHDFSBlockDistribution().getBlockLocalityIndex(
RSRpcServices.getHostname(comConf.conf, false));
if (blockLocalityIndex < comConf.getMinLocalityToForceCompact()) {
LOG.debug("Major compaction triggered on only store " + this
+ "; to make hdfs blocks local, current blockLocalityIndex is "
+ blockLocalityIndex + " (min " + comConf.getMinLocalityToForceCompact() + ")");
result = true;
} else {
LOG.debug("Skipping major compaction of " + this
+ " because one (major) compacted file only, oldestTime " + oldest
+ "ms is < TTL=" + cfTTL + " and blockLocalityIndex is " + blockLocalityIndex
+ " (min " + comConf.getMinLocalityToForceCompact() + ")");
}
} else if (cfTTL != HConstants.FOREVER && oldest > cfTTL) {
LOG.debug("Major compaction triggered on store " + this
+ ", because keyvalues outdated; time since last major compaction "
+ (now - lowTimestamp) + "ms");
result = true;
}
} else {
LOG.debug("Major compaction triggered on store " + this
+ "; time since last major compaction " + (now - lowTimestamp) + "ms");
}
result = true;
} }
candidateSelection = removeExcessFiles(candidateSelection, isUserCompaction, isTryingMajor);
// Now we have the final file list, so we can determine if we can do major/all files.
isAllFiles = (candidateFiles.size() == candidateSelection.size());
CompactionRequest result = new CompactionRequest(candidateSelection);
result.setOffPeak(!candidateSelection.isEmpty() && !isAllFiles && mayUseOffPeak);
result.setIsMajor(isTryingMajor && isAllFiles, isAllFiles);
return result; return result;
} }
/** @Override
* @param candidates pre-filtrate protected CompactionRequest createCompactionRequest(ArrayList<StoreFile>
* @return filtered subset candidateSelection, boolean tryingMajor, boolean mayUseOffPeak, boolean mayBeStuck)
* exclude all files above maxCompactSize throws IOException {
* Also save all references. We MUST compact them if (!tryingMajor) {
*/ candidateSelection = filterBulk(candidateSelection);
protected ArrayList<StoreFile> skipLargeFiles(ArrayList<StoreFile> candidates, candidateSelection = applyCompactionPolicy(candidateSelection, mayUseOffPeak, mayBeStuck);
boolean mayUseOffpeak) { candidateSelection = checkMinFilesCriteria(candidateSelection,
int pos = 0; comConf.getMinFilesToCompact());
while (pos < candidates.size() && !candidates.get(pos).isReference()
&& (candidates.get(pos).getReader().length() > comConf.getMaxCompactSize(mayUseOffpeak))) {
++pos;
} }
if (pos > 0) { return new CompactionRequest(candidateSelection);
LOG.debug("Some files are too large. Excluding " + pos
+ " files from compaction candidates");
candidates.subList(0, pos).clear();
}
return candidates;
} }
/** /**
* @param candidates pre-filtrate
* @return filtered subset
* exclude all bulk load files if configured
*/
protected ArrayList<StoreFile> filterBulk(ArrayList<StoreFile> candidates) {
candidates.removeAll(Collections2.filter(candidates,
new Predicate<StoreFile>() {
@Override
public boolean apply(StoreFile input) {
return input.excludeFromMinorCompaction();
}
}));
return candidates;
}
/**
* @param candidates pre-filtrate
* @return filtered subset
* take upto maxFilesToCompact from the start
*/
private ArrayList<StoreFile> removeExcessFiles(ArrayList<StoreFile> candidates,
boolean isUserCompaction, boolean isMajorCompaction) {
int excess = candidates.size() - comConf.getMaxFilesToCompact();
if (excess > 0) {
if (isMajorCompaction && isUserCompaction) {
LOG.debug("Warning, compacting more than " + comConf.getMaxFilesToCompact() +
" files because of a user-requested major compaction");
} else {
LOG.debug("Too many admissible files. Excluding " + excess
+ " files from compaction candidates");
candidates.subList(comConf.getMaxFilesToCompact(), candidates.size()).clear();
}
}
return candidates;
}
/**
* @param candidates pre-filtrate
* @return filtered subset
* forget the compactionSelection if we don't have enough files
*/
protected ArrayList<StoreFile> checkMinFilesCriteria(ArrayList<StoreFile> candidates) {
int minFiles = comConf.getMinFilesToCompact();
if (candidates.size() < minFiles) {
if(LOG.isDebugEnabled()) {
LOG.debug("Not compacting files because we only have " + candidates.size() +
" files ready for compaction. Need " + minFiles + " to initiate.");
}
candidates.clear();
}
return candidates;
}
/**
* @param candidates pre-filtrate
* @return filtered subset
* -- Default minor compaction selection algorithm: * -- Default minor compaction selection algorithm:
* choose CompactSelection from candidates -- * choose CompactSelection from candidates --
* First exclude bulk-load files if indicated in configuration. * First exclude bulk-load files if indicated in configuration.
@ -227,9 +140,11 @@ public class RatioBasedCompactionPolicy extends CompactionPolicy {
* | | | | | | | | _ | | * | | | | | | | | _ | |
* | | | | | | | | | | | | * | | | | | | | | | | | |
* | | | | | | | | | | | | * | | | | | | | | | | | |
* @param candidates pre-filtrate
* @return filtered subset
*/ */
ArrayList<StoreFile> applyCompactionPolicy(ArrayList<StoreFile> candidates, protected ArrayList<StoreFile> applyCompactionPolicy(ArrayList<StoreFile> candidates,
boolean mayUseOffPeak, boolean mayBeStuck) throws IOException { boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
if (candidates.isEmpty()) { if (candidates.isEmpty()) {
return candidates; return candidates;
} }
@ -276,114 +191,12 @@ public class RatioBasedCompactionPolicy extends CompactionPolicy {
return candidates; return candidates;
} }
/*
* @param filesToCompact Files to compact. Can be null.
* @return True if we should run a major compaction.
*/
@Override
public boolean isMajorCompaction(final Collection<StoreFile> filesToCompact)
throws IOException {
boolean result = false;
long mcTime = getNextMajorCompactTime(filesToCompact);
if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) {
return result;
}
// TODO: Use better method for determining stamp of last major (HBASE-2990)
long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact);
long now = System.currentTimeMillis();
if (lowTimestamp > 0l && lowTimestamp < (now - mcTime)) {
// Major compaction time has elapsed.
long cfTtl = this.storeConfigInfo.getStoreFileTtl();
if (filesToCompact.size() == 1) {
// Single file
StoreFile sf = filesToCompact.iterator().next();
Long minTimestamp = sf.getMinimumTimestamp();
long oldest = (minTimestamp == null)
? Long.MIN_VALUE
: now - minTimestamp.longValue();
if (sf.isMajorCompaction() &&
(cfTtl == HConstants.FOREVER || oldest < cfTtl)) {
float blockLocalityIndex = sf.getHDFSBlockDistribution().getBlockLocalityIndex(
RSRpcServices.getHostname(comConf.conf, false)
);
if (blockLocalityIndex < comConf.getMinLocalityToForceCompact()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Major compaction triggered on only store " + this +
"; to make hdfs blocks local, current blockLocalityIndex is " +
blockLocalityIndex + " (min " + comConf.getMinLocalityToForceCompact() +
")");
}
result = true;
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping major compaction of " + this +
" because one (major) compacted file only, oldestTime " +
oldest + "ms is < ttl=" + cfTtl + " and blockLocalityIndex is " +
blockLocalityIndex + " (min " + comConf.getMinLocalityToForceCompact() +
")");
}
}
} else if (cfTtl != HConstants.FOREVER && oldest > cfTtl) {
LOG.debug("Major compaction triggered on store " + this +
", because keyvalues outdated; time since last major compaction " +
(now - lowTimestamp) + "ms");
result = true;
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Major compaction triggered on store " + this +
"; time since last major compaction " + (now - lowTimestamp) + "ms");
}
result = true;
}
}
return result;
}
/** /**
* Used calculation jitter * A heuristic method to decide whether to schedule a compaction request
* @param storeFiles files in the store.
* @param filesCompacting files being scheduled to compact.
* @return true to schedule a request.
*/ */
private final Random random = new Random();
/**
* @param filesToCompact
* @return When to run next major compaction
*/
public long getNextMajorCompactTime(final Collection<StoreFile> filesToCompact) {
// default = 24hrs
long ret = comConf.getMajorCompactionPeriod();
if (ret > 0) {
// default = 20% = +/- 4.8 hrs
double jitterPct = comConf.getMajorCompactionJitter();
if (jitterPct > 0) {
long jitter = Math.round(ret * jitterPct);
// deterministic jitter avoids a major compaction storm on restart
Integer seed = StoreUtils.getDeterministicRandomSeed(filesToCompact);
if (seed != null) {
// Synchronized to ensure one user of random instance at a time.
double rnd = -1;
synchronized (this) {
this.random.setSeed(seed);
rnd = this.random.nextDouble();
}
ret += jitter - Math.round(2L * jitter * rnd);
} else {
ret = 0; // If seed is null, then no storefiles == no major compaction
}
}
}
return ret;
}
/**
* @param compactionSize Total size of some compaction
* @return whether this should be a large or small compaction
*/
@Override
public boolean throttleCompaction(long compactionSize) {
return compactionSize > comConf.getThrottlePoint();
}
public boolean needsCompaction(final Collection<StoreFile> storeFiles, public boolean needsCompaction(final Collection<StoreFile> storeFiles,
final List<StoreFile> filesCompacting) { final List<StoreFile> filesCompacting) {
int numCandidates = storeFiles.size() - filesCompacting.size(); int numCandidates = storeFiles.size() - filesCompacting.size();
@ -392,7 +205,6 @@ public class RatioBasedCompactionPolicy extends CompactionPolicy {
/** /**
* Overwrite min threshold for compaction * Overwrite min threshold for compaction
* @param minThreshold min to update to
*/ */
public void setMinThreshold(int minThreshold) { public void setMinThreshold(int minThreshold) {
comConf.setMinFilesToCompact(minThreshold); comConf.setMinFilesToCompact(minThreshold);

View File

@ -0,0 +1,239 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
* for the specific language governing permissions and limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.compactions;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreUtils;
/**
* An abstract compaction policy that select files on seq id order.
*/
@InterfaceAudience.Private
public abstract class SortedCompactionPolicy extends CompactionPolicy {
private static final Log LOG = LogFactory.getLog(SortedCompactionPolicy.class);
public SortedCompactionPolicy(Configuration conf, StoreConfigInformation storeConfigInfo) {
super(conf, storeConfigInfo);
}
public List<StoreFile> preSelectCompactionForCoprocessor(final Collection<StoreFile> candidates,
final List<StoreFile> filesCompacting) {
return getCurrentEligibleFiles(new ArrayList<StoreFile>(candidates), filesCompacting);
}
/**
* @param candidateFiles candidate files, ordered from oldest to newest by seqId. We rely on
* DefaultStoreFileManager to sort the files by seqId to guarantee contiguous compaction based
* on seqId for data consistency.
* @return subset copy of candidate list that meets compaction criteria
*/
public CompactionRequest selectCompaction(Collection<StoreFile> candidateFiles,
final List<StoreFile> filesCompacting, final boolean isUserCompaction,
final boolean mayUseOffPeak, final boolean forceMajor) throws IOException {
// Preliminary compaction subject to filters
ArrayList<StoreFile> candidateSelection = new ArrayList<StoreFile>(candidateFiles);
// Stuck and not compacting enough (estimate). It is not guaranteed that we will be
// able to compact more if stuck and compacting, because ratio policy excludes some
// non-compacting files from consideration during compaction (see getCurrentEligibleFiles).
int futureFiles = filesCompacting.isEmpty() ? 0 : 1;
boolean mayBeStuck = (candidateFiles.size() - filesCompacting.size() + futureFiles)
>= storeConfigInfo.getBlockingFileCount();
candidateSelection = getCurrentEligibleFiles(candidateSelection, filesCompacting);
LOG.debug("Selecting compaction from " + candidateFiles.size() + " store files, " +
filesCompacting.size() + " compacting, " + candidateSelection.size() +
" eligible, " + storeConfigInfo.getBlockingFileCount() + " blocking");
// If we can't have all files, we cannot do major anyway
boolean isAllFiles = candidateFiles.size() == candidateSelection.size();
if (!(forceMajor && isAllFiles)) {
candidateSelection = skipLargeFiles(candidateSelection, mayUseOffPeak);
isAllFiles = candidateFiles.size() == candidateSelection.size();
}
// Try a major compaction if this is a user-requested major compaction,
// or if we do not have too many files to compact and this was requested as a major compaction
boolean isTryingMajor = (forceMajor && isAllFiles && isUserCompaction)
|| (((forceMajor && isAllFiles) || shouldPerformMajorCompaction(candidateSelection))
&& (candidateSelection.size() < comConf.getMaxFilesToCompact()));
// Or, if there are any references among the candidates.
boolean isAfterSplit = StoreUtils.hasReferences(candidateSelection);
CompactionRequest result = createCompactionRequest(candidateSelection,
isTryingMajor || isAfterSplit, mayUseOffPeak, mayBeStuck);
ArrayList<StoreFile> filesToCompact = Lists.newArrayList(result.getFiles());
removeExcessFiles(filesToCompact, isUserCompaction, isTryingMajor);
result.updateFiles(filesToCompact);
isAllFiles = (candidateFiles.size() == filesToCompact.size());
result.setOffPeak(!filesToCompact.isEmpty() && !isAllFiles && mayUseOffPeak);
result.setIsMajor(isTryingMajor && isAllFiles, isAllFiles);
return result;
}
protected abstract CompactionRequest createCompactionRequest(ArrayList<StoreFile>
candidateSelection, boolean tryingMajor, boolean mayUseOffPeak, boolean mayBeStuck)
throws IOException;
/*
* @param filesToCompact Files to compact. Can be null.
* @return True if we should run a major compaction.
*/
public abstract boolean shouldPerformMajorCompaction(final Collection<StoreFile> filesToCompact)
throws IOException;
/**
* Used calculation jitter
*/
private final Random random = new Random();
/**
* @param filesToCompact
* @return When to run next major compaction
*/
public long getNextMajorCompactTime(final Collection<StoreFile> filesToCompact) {
// default = 24hrs
long ret = comConf.getMajorCompactionPeriod();
if (ret > 0) {
// default = 20% = +/- 4.8 hrs
double jitterPct = comConf.getMajorCompactionJitter();
if (jitterPct > 0) {
long jitter = Math.round(ret * jitterPct);
// deterministic jitter avoids a major compaction storm on restart
Integer seed = StoreUtils.getDeterministicRandomSeed(filesToCompact);
if (seed != null) {
// Synchronized to ensure one user of random instance at a time.
double rnd = -1;
synchronized (this) {
this.random.setSeed(seed);
rnd = this.random.nextDouble();
}
ret += jitter - Math.round(2L * jitter * rnd);
} else {
ret = 0; // If seed is null, then no storefiles == no major compaction
}
}
}
return ret;
}
/**
* @param compactionSize Total size of some compaction
* @return whether this should be a large or small compaction
*/
public boolean throttleCompaction(long compactionSize) {
return compactionSize > comConf.getThrottlePoint();
}
public abstract boolean needsCompaction(final Collection<StoreFile> storeFiles,
final List<StoreFile> filesCompacting);
protected ArrayList<StoreFile> getCurrentEligibleFiles(ArrayList<StoreFile> candidateFiles,
final List<StoreFile> filesCompacting) {
// candidates = all storefiles not already in compaction queue
if (!filesCompacting.isEmpty()) {
// exclude all files older than the newest file we're currently
// compacting. this allows us to preserve contiguity (HBASE-2856)
StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
int idx = candidateFiles.indexOf(last);
Preconditions.checkArgument(idx != -1);
candidateFiles.subList(0, idx + 1).clear();
}
return candidateFiles;
}
/**
* @param candidates pre-filtrate
* @return filtered subset exclude all files above maxCompactSize
* Also save all references. We MUST compact them
*/
protected ArrayList<StoreFile> skipLargeFiles(ArrayList<StoreFile> candidates,
boolean mayUseOffpeak) {
int pos = 0;
while (pos < candidates.size() && !candidates.get(pos).isReference()
&& (candidates.get(pos).getReader().length() > comConf.getMaxCompactSize(mayUseOffpeak))) {
++pos;
}
if (pos > 0) {
LOG.debug("Some files are too large. Excluding " + pos
+ " files from compaction candidates");
candidates.subList(0, pos).clear();
}
return candidates;
}
/**
* @param candidates pre-filtrate
* @return filtered subset exclude all bulk load files if configured
*/
protected ArrayList<StoreFile> filterBulk(ArrayList<StoreFile> candidates) {
candidates.removeAll(Collections2.filter(candidates, new Predicate<StoreFile>() {
@Override
public boolean apply(StoreFile input) {
return input.excludeFromMinorCompaction();
}
}));
return candidates;
}
/**
* @param candidates pre-filtrate
*/
protected void removeExcessFiles(ArrayList<StoreFile> candidates,
boolean isUserCompaction, boolean isMajorCompaction) {
int excess = candidates.size() - comConf.getMaxFilesToCompact();
if (excess > 0) {
if (isMajorCompaction && isUserCompaction) {
LOG.debug("Warning, compacting more than " + comConf.getMaxFilesToCompact()
+ " files because of a user-requested major compaction");
} else {
LOG.debug("Too many admissible files. Excluding " + excess
+ " files from compaction candidates");
candidates.subList(comConf.getMaxFilesToCompact(), candidates.size()).clear();
}
}
}
/**
* @param candidates pre-filtrate
* @return filtered subset forget the compactionSelection if we don't have enough files
*/
protected ArrayList<StoreFile> checkMinFilesCriteria(ArrayList<StoreFile> candidates,
int minFiles) {
if (candidates.size() < minFiles) {
if (LOG.isDebugEnabled()) {
LOG.debug("Not compacting files because we only have " + candidates.size()
+ " files ready for compaction. Need " + minFiles + " to initiate.");
}
candidates.clear();
}
return candidates;
}
}

View File

@ -166,7 +166,8 @@ public class StripeCompactionPolicy extends CompactionPolicy {
} }
@Override @Override
public boolean isMajorCompaction(Collection<StoreFile> filesToCompact) throws IOException { public boolean shouldPerformMajorCompaction(Collection<StoreFile> filesToCompact)
throws IOException {
return false; // there's never a major compaction! return false; // there's never a major compaction!
} }

View File

@ -25,8 +25,11 @@ import java.util.TreeMap;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/** A mock used so our tests don't deal with actual StoreFiles */ /** A mock used so our tests don't deal with actual StoreFiles */
public class MockStoreFile extends StoreFile { public class MockStoreFile extends StoreFile {
@ -38,6 +41,9 @@ public class MockStoreFile extends StoreFile {
byte[] splitPoint = null; byte[] splitPoint = null;
TimeRangeTracker timeRangeTracker; TimeRangeTracker timeRangeTracker;
long entryCount; long entryCount;
boolean isMajor;
HDFSBlocksDistribution hdfsBlocksDistribution;
long modificationTime;
MockStoreFile(HBaseTestingUtility testUtil, Path testPath, MockStoreFile(HBaseTestingUtility testUtil, Path testPath,
long length, long ageInDisk, boolean isRef, long sequenceid) throws IOException { long length, long ageInDisk, boolean isRef, long sequenceid) throws IOException {
@ -47,6 +53,11 @@ public class MockStoreFile extends StoreFile {
this.isRef = isRef; this.isRef = isRef;
this.ageInDisk = ageInDisk; this.ageInDisk = ageInDisk;
this.sequenceid = sequenceid; this.sequenceid = sequenceid;
this.isMajor = false;
hdfsBlocksDistribution = new HDFSBlocksDistribution();
hdfsBlocksDistribution.addHostsAndBlockWeight(
new String[] { RSRpcServices.getHostname(testUtil.getConfiguration(), false) }, 1);
modificationTime = EnvironmentEdgeManager.currentTime();
} }
void setLength(long newLen) { void setLength(long newLen) {
@ -65,7 +76,11 @@ public class MockStoreFile extends StoreFile {
@Override @Override
public boolean isMajorCompaction() { public boolean isMajorCompaction() {
return false; return isMajor;
}
public void setIsMajor(boolean isMajor) {
this.isMajor = isMajor;
} }
@Override @Override
@ -74,12 +89,7 @@ public class MockStoreFile extends StoreFile {
} }
@Override @Override
boolean isBulkLoadResult() { public boolean isBulkLoadResult() {
return false;
}
@Override
public boolean isCompactedAway() {
return false; return false;
} }
@ -102,14 +112,22 @@ public class MockStoreFile extends StoreFile {
public Long getMinimumTimestamp() { public Long getMinimumTimestamp() {
return (timeRangeTracker == null) ? return (timeRangeTracker == null) ?
null : null : timeRangeTracker.getMinimumTimestamp();
timeRangeTracker.getMinimumTimestamp();
} }
public Long getMaximumTimestamp() { public Long getMaximumTimestamp() {
return (timeRangeTracker == null) ? return (timeRangeTracker == null) ?
null : null : timeRangeTracker.getMaximumTimestamp();
timeRangeTracker.getMaximumTimestamp(); }
@Override
public long getModificationTimeStamp() {
return modificationTime;
}
@Override
public HDFSBlocksDistribution getHDFSBlockDistribution() {
return hdfsBlocksDistribution;
} }
@Override @Override

View File

@ -0,0 +1,325 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionRequest;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(SmallTests.class)
public class TestDateTieredCompactionPolicy extends TestCompactionPolicy {
ArrayList<StoreFile> sfCreate(long[] minTimestamps, long[] maxTimestamps, long[] sizes)
throws IOException {
ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge();
EnvironmentEdgeManager.injectEdge(timeMachine);
// Has to be > 0 and < now.
timeMachine.setValue(1);
ArrayList<Long> ageInDisk = new ArrayList<Long>();
for (int i = 0; i < sizes.length; i++) {
ageInDisk.add(0L);
}
ArrayList<StoreFile> ret = Lists.newArrayList();
for (int i = 0; i < sizes.length; i++) {
MockStoreFile msf =
new MockStoreFile(TEST_UTIL, TEST_FILE, sizes[i], ageInDisk.get(i), false, i);
msf.setTimeRangeTracker(new TimeRangeTracker(minTimestamps[i], maxTimestamps[i]));
ret.add(msf);
}
return ret;
}
@Override
protected void config() {
super.config();
// Set up policy
conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY,
"org.apache.hadoop.hbase.regionserver.DateTieredStoreEngine");
conf.setLong(CompactionConfiguration.MAX_AGE_MILLIS_KEY, 100);
conf.setLong(CompactionConfiguration.INCOMING_WINDOW_MIN_KEY, 3);
conf.setLong(CompactionConfiguration.BASE_WINDOW_MILLIS_KEY, 6);
conf.setInt(CompactionConfiguration.WINDOWS_PER_TIER_KEY, 4);
conf.setBoolean(CompactionConfiguration.SINGLE_OUTPUT_FOR_MINOR_COMPACTION_KEY, false);
// Special settings for compaction policy per window
this.conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 2);
this.conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 12);
this.conf.setFloat(CompactionConfiguration.HBASE_HSTORE_COMPACTION_RATIO_KEY, 1.2F);
conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 20);
conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 10);
}
void compactEquals(long now, ArrayList<StoreFile> candidates, long[] expectedFileSizes,
long[] expectedBoundaries, boolean isMajor, boolean toCompact) throws IOException {
ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge();
EnvironmentEdgeManager.injectEdge(timeMachine);
timeMachine.setValue(now);
DateTieredCompactionRequest request;
if (isMajor) {
for (StoreFile file : candidates) {
((MockStoreFile)file).setIsMajor(true);
}
Assert.assertEquals(toCompact, ((DateTieredCompactionPolicy) store.storeEngine.getCompactionPolicy())
.shouldPerformMajorCompaction(candidates));
request = (DateTieredCompactionRequest) ((DateTieredCompactionPolicy) store.storeEngine
.getCompactionPolicy()).selectMajorCompaction(candidates);
} else {
Assert.assertEquals(toCompact, ((DateTieredCompactionPolicy) store.storeEngine.getCompactionPolicy())
.needsCompaction(candidates, ImmutableList.<StoreFile> of()));
request = (DateTieredCompactionRequest) ((DateTieredCompactionPolicy) store.storeEngine
.getCompactionPolicy()).selectMinorCompaction(candidates, false, false);
}
List<StoreFile> actual = Lists.newArrayList(request.getFiles());
Assert.assertEquals(Arrays.toString(expectedFileSizes), Arrays.toString(getSizes(actual)));
Assert.assertEquals(Arrays.toString(expectedBoundaries),
Arrays.toString(request.getBoundaries().toArray()));
}
/**
* Test for incoming window
* @throws IOException with error
*/
@Test
public void incomingWindow() throws IOException {
long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15 };
long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 23, 24, 25, 10, 11, 12, 13 };
compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 10, 11, 12, 13 },
new long[] { Long.MIN_VALUE, 12 }, false, true);
}
/**
* Not enough files in incoming window
* @throws IOException with error
*/
@Test
public void NotIncomingWindow() throws IOException {
long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13 };
long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 23, 24, 25, 10, 11 };
compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 20, 21, 22, 23,
24, 25 }, new long[] { Long.MIN_VALUE, 6}, false, true);
}
/**
* Test for file on the upper bound of incoming window
* @throws IOException with error
*/
@Test
public void OnUpperBoundOfIncomingWindow() throws IOException {
long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 18 };
long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 23, 24, 25, 10, 11, 12, 13 };
compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 10, 11, 12, 13 },
new long[] { Long.MIN_VALUE, 12 }, false, true);
}
/**
* Test for file newer than incoming window
* @throws IOException with error
*/
@Test
public void NewerThanIncomingWindow() throws IOException {
long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 19 };
long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 23, 24, 25, 10, 11, 12, 13 };
compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 10, 11, 12, 13 },
new long[] { Long.MIN_VALUE, 12}, false, true);
}
/**
* If there is no T1 window, we don't build T2
* @throws IOException with error
*/
@Test
public void NoT2() throws IOException {
long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
long[] maxTimestamps = new long[] { 44, 60, 61, 97, 100, 193 };
long[] sizes = new long[] { 0, 20, 21, 22, 23, 1 };
compactEquals(194, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 22, 23 },
new long[] { Long.MIN_VALUE, 96}, false, true);
}
@Test
public void T1() throws IOException {
long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
long[] maxTimestamps = new long[] { 44, 60, 61, 96, 100, 104, 120, 124, 143, 145, 157 };
long[] sizes = new long[] { 0, 50, 51, 40, 41, 42, 30, 31, 32, 2, 1 };
compactEquals(161, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 30, 31, 32 },
new long[] { Long.MIN_VALUE, 120 }, false, true);
}
/**
* Apply exploring logic on non-incoming window
* @throws IOException with error
*/
@Test
public void RatioT0() throws IOException {
long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 };
long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 280, 23, 24, 1 };
compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 20, 21, 22 },
new long[] { Long.MIN_VALUE }, false, true);
}
/**
* Also apply ratio-based logic on t2 window
* @throws IOException with error
*/
@Test
public void RatioT2() throws IOException {
long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
long[] maxTimestamps = new long[] { 44, 60, 61, 96, 100, 104, 120, 124, 143, 145, 157 };
long[] sizes = new long[] { 0, 50, 51, 40, 41, 42, 350, 30, 31, 2, 1 };
compactEquals(161, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 30, 31 },
new long[] { Long.MIN_VALUE }, false, true);
}
/**
* The next compaction call after testTieredCompactionRatioT0 is compacted
* @throws IOException with error
*/
@Test
public void RatioT0Next() throws IOException {
long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 8, 9, 10, 11, 12 };
long[] sizes = new long[] { 30, 31, 32, 33, 34, 22, 280, 23, 24, 1 };
compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 23, 24 },
new long[] { Long.MIN_VALUE }, false, true);
}
/**
* Older than now(161) - maxAge(100)
* @throws IOException with error
*/
@Test
public void olderThanMaxAge() throws IOException {
long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
long[] maxTimestamps = new long[] { 44, 60, 61, 96, 100, 104, 105, 106, 113, 145, 157 };
long[] sizes = new long[] { 0, 50, 51, 40, 41, 42, 33, 30, 31, 2, 1 };
compactEquals(161, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 40, 41, 42, 33,
30, 31 }, new long[] { Long.MIN_VALUE, 96 }, false, true);
}
/**
* Out-of-order data
* @throws IOException with error
*/
@Test
public void outOfOrder() throws IOException {
long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
long[] maxTimestamps = new long[] { 0, 13, 3, 10, 11, 1, 2, 12, 14, 15 };
long[] sizes = new long[] { 30, 31, 32, 33, 34, 22, 28, 23, 24, 1 };
compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 31, 32, 33, 34,
22, 28, 23, 24, 1 }, new long[] { Long.MIN_VALUE, 12 }, false, true);
}
/**
* Negative epoch time
* @throws IOException with error
*/
@Test
public void negativeEpochtime() throws IOException {
long[] minTimestamps =
new long[] { -1000, -1000, -1000, -1000, -1000, -1000, -1000, -1000, -1000, -1000 };
long[] maxTimestamps = new long[] { -28, -11, -10, -9, -8, -7, -6, -5, -4, -3 };
long[] sizes = new long[] { 30, 31, 32, 33, 34, 22, 25, 23, 24, 1 };
compactEquals(1, sfCreate(minTimestamps, maxTimestamps, sizes),
new long[] { 31, 32, 33, 34, 22, 25, 23, 24, 1 },
new long[] { Long.MIN_VALUE, -24 }, false, true);
}
/**
* Major compaction
* @throws IOException with error
*/
@Test
public void majorCompation() throws IOException {
long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
long[] maxTimestamps = new long[] { 44, 60, 61, 96, 100, 104, 105, 106, 113, 145, 157 };
long[] sizes = new long[] { 0, 50, 51, 40, 41, 42, 33, 30, 31, 2, 1 };
compactEquals(161, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 0, 50, 51, 40,41, 42,
33, 30, 31, 2, 1 }, new long[] { Long.MIN_VALUE, 24, 48, 72, 96, 120, 144, 150, 156 }, true, true);
}
/**
* Major compaction with negative numbers
* @throws IOException with error
*/
@Test
public void negativeForMajor() throws IOException {
long[] minTimestamps =
new long[] { -155, -100, -100, -100, -100, -100, -100, -100, -100, -100, -100 };
long[] maxTimestamps = new long[] { -8, -7, -6, -5, -4, -3, -2, -1, 0, 6, 13 };
long[] sizes = new long[] { 0, 50, 51, 40, 41, 42, 33, 30, 31, 2, 1 };
compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 0, 50, 51, 40,
41, 42, 33, 30, 31, 2, 1 },
new long[] { Long.MIN_VALUE, -144, -120, -96, -72, -48, -24, 0, 6, 12 }, true, true);
}
/**
* Major compaction with maximum values
* @throws IOException with error
*/
@Test
public void maxValuesForMajor() throws IOException {
conf.setLong(CompactionConfiguration.BASE_WINDOW_MILLIS_KEY, Long.MAX_VALUE / 2);
conf.setInt(CompactionConfiguration.WINDOWS_PER_TIER_KEY, 2);
store.storeEngine.getCompactionPolicy().setConf(conf);
long[] minTimestamps =
new long[] { Long.MIN_VALUE, -100 };
long[] maxTimestamps = new long[] { -8, Long.MAX_VALUE };
long[] sizes = new long[] { 0, 1 };
compactEquals(Long.MAX_VALUE, sfCreate(minTimestamps, maxTimestamps, sizes),
new long[] { 0, 1 },
new long[] { Long.MIN_VALUE, -4611686018427387903L, 0, 4611686018427387903L,
9223372036854775806L }, true, true);
}
}

View File

@ -42,7 +42,7 @@ public class EverythingPolicy extends RatioBasedCompactionPolicy {
} }
@Override @Override
final ArrayList<StoreFile> applyCompactionPolicy(final ArrayList<StoreFile> candidates, protected final ArrayList<StoreFile> applyCompactionPolicy(final ArrayList<StoreFile> candidates,
final boolean mayUseOffPeak, final boolean mayBeStuck) throws IOException { final boolean mayUseOffPeak, final boolean mayBeStuck) throws IOException {
if (candidates.size() < comConf.getMinFilesToCompact()) { if (candidates.size() < comConf.getMinFilesToCompact()) {