HBASE-15181 A simple implementation of date based tiered compaction (Clara Xiong)

This commit is contained in:
tedyu 2016-02-27 07:33:19 -08:00
parent d233e09c14
commit 0cd299dfca
8 changed files with 829 additions and 185 deletions

View File

@ -741,6 +741,13 @@ public class StoreFile {
getReader().timeRangeTracker.getMinimumTimestamp();
}
public Long getMaximumTimestamp() {
return (getReader().timeRangeTracker == null) ?
null :
getReader().timeRangeTracker.getMaximumTimestamp();
}
/**
* Gets the approximate mid-point of this file that is optimal for use in splitting it.
* @param comparator Comparator used to compare KVs.

View File

@ -21,9 +21,9 @@ package org.apache.hadoop.hbase.regionserver.compactions;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
/**
@ -71,6 +71,23 @@ public class CompactionConfiguration {
public static final String HBASE_HFILE_COMPACTION_DISCHARGER_INTERVAL =
"hbase.hfile.compaction.discharger.interval";
/*
* The epoch time length for the windows we no longer compact
*/
public static final String MAX_AGE_MILLIS_KEY =
"hbase.hstore.compaction.date.tiered.max.storefile.age.millis";
public static final String BASE_WINDOW_MILLIS_KEY =
"hbase.hstore.compaction.date.tiered.base.window.millis";
public static final String WINDOWS_PER_TIER_KEY =
"hbase.hstore.compaction.date.tiered.windows.per.tier";
public static final String INCOMING_WINDOW_MIN_KEY =
"hbase.hstore.compaction.date.tiered.incoming.window.min";
public static final String COMPACTION_POLICY_CLASS_FOR_TIERED_WINDOWS_KEY =
"hbase.hstore.compaction.date.tiered.window.policy.class";
private static final Class<? extends RatioBasedCompactionPolicy>
DEFAULT_TIER_COMPACTION_POLICY_CLASS = ExploringCompactionPolicy.class;
Configuration conf;
StoreConfigInformation storeConfigInfo;
@ -79,13 +96,19 @@ public class CompactionConfiguration {
private final long maxCompactSize;
private final long offPeakMaxCompactSize;
private final long minCompactSize;
private final int minFilesToCompact;
/** This one can be update **/
private int minFilesToCompact;
private final int maxFilesToCompact;
private final double compactionRatio;
private final long throttlePoint;
private final long majorCompactionPeriod;
private final float majorCompactionJitter;
private final float minLocalityToForceCompact;
private final long maxStoreFileAgeMillis;
private final long baseWindowMillis;
private final int windowsPerTier;
private final int incomingWindowMin;
private final String compactionPolicyForTieredWindow;
CompactionConfiguration(Configuration conf, StoreConfigInformation storeConfigInfo) {
this.conf = conf;
@ -108,6 +131,13 @@ public class CompactionConfiguration {
// Make it 0.5 so jitter has us fall evenly either side of when the compaction should run
majorCompactionJitter = conf.getFloat("hbase.hregion.majorcompaction.jitter", 0.50F);
minLocalityToForceCompact = conf.getFloat(HBASE_HSTORE_MIN_LOCALITY_TO_SKIP_MAJOR_COMPACT, 0f);
maxStoreFileAgeMillis = conf.getLong(MAX_AGE_MILLIS_KEY, Long.MAX_VALUE);
baseWindowMillis = conf.getLong(BASE_WINDOW_MILLIS_KEY, 3600000 * 6);
windowsPerTier = conf.getInt(WINDOWS_PER_TIER_KEY, 4);
incomingWindowMin = conf.getInt(INCOMING_WINDOW_MIN_KEY, 6);
compactionPolicyForTieredWindow = conf.get(COMPACTION_POLICY_CLASS_FOR_TIERED_WINDOWS_KEY,
DEFAULT_TIER_COMPACTION_POLICY_CLASS.getName());
LOG.info(this);
}
@ -115,7 +145,9 @@ public class CompactionConfiguration {
public String toString() {
return String.format(
"size [%d, %d, %d); files [%d, %d); ratio %f; off-peak ratio %f; throttle point %d;"
+ " major period %d, major jitter %f, min locality to compact %f",
+ " major period %d, major jitter %f, min locality to compact %f;"
+ " tiered compaction: max_age %d, base window in milliseconds %d, windows per tier %d,"
+ "incoming window min %d",
minCompactSize,
maxCompactSize,
offPeakMaxCompactSize,
@ -126,7 +158,11 @@ public class CompactionConfiguration {
throttlePoint,
majorCompactionPeriod,
majorCompactionJitter,
minLocalityToForceCompact);
minLocalityToForceCompact,
maxStoreFileAgeMillis,
baseWindowMillis,
windowsPerTier,
incomingWindowMin);
}
/**
@ -150,6 +186,14 @@ public class CompactionConfiguration {
return minFilesToCompact;
}
/**
* Set upper bound on number of files to be included in minor compactions
* @param threshold value to set to
*/
public void setMinFilesToCompact(int threshold) {
minFilesToCompact = threshold;
}
/**
* @return upper bound on number of files to be included in minor compactions
*/
@ -214,4 +258,24 @@ public class CompactionConfiguration {
return getMaxCompactSize();
}
}
public long getMaxStoreFileAgeMillis() {
return maxStoreFileAgeMillis;
}
public long getBaseWindowMillis() {
return baseWindowMillis;
}
public int getWindowsPerTier() {
return windowsPerTier;
}
public int getIncomingWindowMin() {
return incomingWindowMin;
}
public String getCompactionPolicyForTieredWindow() {
return compactionPolicyForTieredWindow;
}
}

View File

@ -0,0 +1,294 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.compactions;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.PeekingIterator;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ReflectionUtils;
/**
* HBASE-15181 This is a simple implementation of date-based tiered compaction similar to
* Cassandra's for the following benefits:
* 1. Improve date-range-based scan by structuring store files in date-based tiered layout.
* 2. Reduce compaction overhead.
* 3. Improve TTL efficiency.
* Perfect fit for the use cases that:
* 1. has mostly date-based data write and scan and a focus on the most recent data.
* 2. never or rarely deletes data. Out-of-order writes are handled gracefully. Time range
* overlapping among store files is tolerated and the performance impact is minimized. Configuration
* can be set at hbase-site or overriden at per-table or per-column-famly level by hbase shell.
* Design spec is at
* https://docs.google.com/document/d/1_AmlNb2N8Us1xICsTeGDLKIqL6T-oHoRLZ323MG_uy8/
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class DateTieredCompactionPolicy extends RatioBasedCompactionPolicy {
private static final Log LOG = LogFactory.getLog(DateTieredCompactionPolicy.class);
private RatioBasedCompactionPolicy compactionPolicyPerWindow;
public DateTieredCompactionPolicy(Configuration conf, StoreConfigInformation storeConfigInfo)
throws IOException {
super(conf, storeConfigInfo);
try {
compactionPolicyPerWindow =
ReflectionUtils.instantiateWithCustomCtor(comConf.getCompactionPolicyForTieredWindow(),
new Class[] { Configuration.class, StoreConfigInformation.class }, new Object[] { conf,
storeConfigInfo });
} catch (Exception e) {
throw new IOException("Unable to load configured compaction policy '"
+ comConf.getCompactionPolicyForTieredWindow() + "'", e);
}
}
@Override
public boolean isMajorCompaction(Collection<StoreFile> filesToCompact) throws IOException {
// Never do major compaction unless forced
return false;
}
@Override
/**
* Heuristics for guessing whether we need compaction.
*/
public boolean needsCompaction(final Collection<StoreFile> storeFiles,
final List<StoreFile> filesCompacting) {
return needsCompaction(storeFiles, filesCompacting, EnvironmentEdgeManager.currentTime());
}
@VisibleForTesting
public boolean needsCompaction(final Collection<StoreFile> storeFiles,
final List<StoreFile> filesCompacting, long now) {
if (!super.needsCompaction(storeFiles, filesCompacting)) {
return false;
}
ArrayList<StoreFile> candidates = new ArrayList<StoreFile>(storeFiles);
candidates = filterBulk(candidates);
candidates = skipLargeFiles(candidates, true);
try {
candidates = applyCompactionPolicy(candidates, true, false, now);
} catch (Exception e) {
LOG.error("Can not check for compaction: ", e);
return false;
}
return candidates != null && candidates.size() >= comConf.getMinFilesToCompact();
}
/**
* Could return null if no candidates are found
*/
@Override
public ArrayList<StoreFile> applyCompactionPolicy(ArrayList<StoreFile> candidates,
boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
return applyCompactionPolicy(candidates, mayUseOffPeak, mayBeStuck,
EnvironmentEdgeManager.currentTime());
}
/**
* Input candidates are sorted from oldest to newest by seqId. Could return null if no candidates
* are found.
*/
@VisibleForTesting
public ArrayList<StoreFile> applyCompactionPolicy(ArrayList<StoreFile> candidates,
boolean mayUseOffPeak, boolean mayBeStuck, long now) throws IOException {
Iterable<StoreFile> candidatesInWindow =
filterOldStoreFiles(Lists.newArrayList(candidates), comConf.getMaxStoreFileAgeMillis(), now);
List<ArrayList<StoreFile>> buckets =
partitionFilesToBuckets(candidatesInWindow, comConf.getBaseWindowMillis(),
comConf.getWindowsPerTier(), now);
LOG.debug("Compaction buckets are: " + buckets);
return newestBucket(buckets, comConf.getIncomingWindowMin(), now,
comConf.getBaseWindowMillis(), mayUseOffPeak);
}
/**
* @param buckets the list of buckets, sorted from newest to oldest, from which to return the
* newest bucket within thresholds.
* @param incomingWindowThreshold minimum number of storeFiles in a bucket to qualify.
* @param maxThreshold maximum number of storeFiles to compact at once (the returned bucket will
* be trimmed down to this).
* @return a bucket (a list of store files within a window to be compacted).
* @throws IOException error
*/
private ArrayList<StoreFile> newestBucket(List<ArrayList<StoreFile>> buckets,
int incomingWindowThreshold, long now, long baseWindowMillis, boolean mayUseOffPeak)
throws IOException {
Window incomingWindow = getInitialWindow(now, baseWindowMillis);
for (ArrayList<StoreFile> bucket : buckets) {
int minThreshold =
incomingWindow.compareToTimestamp(bucket.get(0).getMaximumTimestamp()) <= 0 ? comConf
.getIncomingWindowMin() : comConf.getMinFilesToCompact();
compactionPolicyPerWindow.setMinThreshold(minThreshold);
ArrayList<StoreFile> candidates =
compactionPolicyPerWindow.applyCompactionPolicy(bucket, mayUseOffPeak, false);
if (candidates != null && !candidates.isEmpty()) {
return candidates;
}
}
return null;
}
/**
* We receive store files sorted in ascending order by seqId then scan the list of files. If the
* current file has a maxTimestamp older than last known maximum, treat this file as it carries
* the last known maximum. This way both seqId and timestamp are in the same order. If files carry
* the same maxTimestamps, they are ordered by seqId. We then reverse the list so they are ordered
* by seqId and maxTimestamp in decending order and build the time windows. All the out-of-order
* data into the same compaction windows, guaranteeing contiguous compaction based on sequence id.
*/
private static List<ArrayList<StoreFile>> partitionFilesToBuckets(Iterable<StoreFile> storeFiles,
long baseWindowSizeMillis, int windowsPerTier, long now) {
List<ArrayList<StoreFile>> buckets = Lists.newArrayList();
Window window = getInitialWindow(now, baseWindowSizeMillis);
List<Pair<StoreFile, Long>> storefileMaxTimestampPairs =
Lists.newArrayListWithCapacity(Iterables.size(storeFiles));
long maxTimestampSeen = Long.MIN_VALUE;
for (StoreFile storeFile : storeFiles) {
// if there is out-of-order data,
// we put them in the same window as the last file in increasing order
maxTimestampSeen = Math.max(maxTimestampSeen, storeFile.getMaximumTimestamp());
storefileMaxTimestampPairs.add(new Pair<StoreFile, Long>(storeFile, maxTimestampSeen));
}
Collections.reverse(storefileMaxTimestampPairs);
PeekingIterator<Pair<StoreFile, Long>> it =
Iterators.peekingIterator(storefileMaxTimestampPairs.iterator());
while (it.hasNext()) {
int compResult = window.compareToTimestamp(it.peek().getSecond());
if (compResult > 0) {
// If the file is too old for the window, switch to the next window
window = window.nextWindow(windowsPerTier);
} else {
// The file is within the target window
ArrayList<StoreFile> bucket = Lists.newArrayList();
// Add all files in the same window to current bucket. For incoming window
// we tolerate files with future data although it is sub-optimal
while (it.hasNext() && window.compareToTimestamp(it.peek().getSecond()) <= 0) {
bucket.add(it.next().getFirst());
}
if (!bucket.isEmpty()) {
buckets.add(bucket);
}
}
}
return buckets;
}
/**
* Removes all store files with max timestamp older than (current - maxAge).
* @param storeFiles all store files to consider
* @param maxAge the age in milliseconds when a store file stops participating in compaction.
* @param now current time. store files with max timestamp less than (now - maxAge) are filtered.
* @return a list of storeFiles with the store file older than maxAge excluded
*/
private static Iterable<StoreFile> filterOldStoreFiles(List<StoreFile> storeFiles, long maxAge,
long now) {
if (maxAge == 0) {
return ImmutableList.of();
}
final long cutoff = now - maxAge;
return Iterables.filter(storeFiles, new Predicate<StoreFile>() {
@Override
public boolean apply(StoreFile storeFile) {
// Known findbugs issue to guava. SuppressWarning or Nonnull annotation don't work.
if (storeFile == null) {
throw new NullPointerException();
}
return storeFile.getMaximumTimestamp() >= cutoff;
}
});
}
private static Window getInitialWindow(long now, long timeUnit) {
return new Window(timeUnit, now / timeUnit);
}
/**
* This is the class we use to partition from epoch time to now into tiers of exponential sizes of
* windows.
*/
private static final class Window {
/**
* How big a range of timestamps fit inside the window in milliseconds.
*/
private final long windowMillis;
/**
* A timestamp t is within the window iff t / size == divPosition.
*/
private final long divPosition;
private Window(long baseWindowMillis, long divPosition) {
this.windowMillis = baseWindowMillis;
this.divPosition = divPosition;
}
/**
* Compares the window to a timestamp.
* @param timestamp the timestamp to compare.
* @return a negative integer, zero, or a positive integer as the window lies before, covering,
* or after than the timestamp.
*/
public int compareToTimestamp(long timestamp) {
long pos = timestamp / windowMillis;
return divPosition == pos ? 0 : divPosition < pos ? -1 : 1;
}
/**
* Move to the new window of the same tier or of the next tier, which represents an earlier time
* span.
* @param windowsPerTier The number of contiguous windows that will have the same size. Windows
* following those will be <code>tierBase</code> times as big.
* @return The next window
*/
public Window nextWindow(int windowsPerTier) {
if (divPosition % windowsPerTier > 0) {
return new Window(windowMillis, divPosition - 1);
} else {
return new Window(windowMillis * windowsPerTier, divPosition / windowsPerTier - 1);
}
}
}
}

View File

@ -73,7 +73,9 @@ public class RatioBasedCompactionPolicy extends CompactionPolicy {
}
/**
* @param candidateFiles candidate files, ordered from oldest to newest. All files in store.
* @param candidateFiles candidate files, ordered from oldest to newest by seqId. We rely on
* DefaultStoreFileManager to sort the files by seqId to guarantee contiguous compaction based
* on seqId for data consistency.
* @return subset copy of candidate list that meets compaction criteria
* @throws java.io.IOException
*/
@ -128,7 +130,7 @@ public class RatioBasedCompactionPolicy extends CompactionPolicy {
* exclude all files above maxCompactSize
* Also save all references. We MUST compact them
*/
private ArrayList<StoreFile> skipLargeFiles(ArrayList<StoreFile> candidates,
protected ArrayList<StoreFile> skipLargeFiles(ArrayList<StoreFile> candidates,
boolean mayUseOffpeak) {
int pos = 0;
while (pos < candidates.size() && !candidates.get(pos).isReference()
@ -148,7 +150,7 @@ public class RatioBasedCompactionPolicy extends CompactionPolicy {
* @return filtered subset
* exclude all bulk load files if configured
*/
private ArrayList<StoreFile> filterBulk(ArrayList<StoreFile> candidates) {
protected ArrayList<StoreFile> filterBulk(ArrayList<StoreFile> candidates) {
candidates.removeAll(Collections2.filter(candidates,
new Predicate<StoreFile>() {
@Override
@ -184,7 +186,7 @@ public class RatioBasedCompactionPolicy extends CompactionPolicy {
* @return filtered subset
* forget the compactionSelection if we don't have enough files
*/
private ArrayList<StoreFile> checkMinFilesCriteria(ArrayList<StoreFile> candidates) {
protected ArrayList<StoreFile> checkMinFilesCriteria(ArrayList<StoreFile> candidates) {
int minFiles = comConf.getMinFilesToCompact();
if (candidates.size() < minFiles) {
if(LOG.isDebugEnabled()) {
@ -387,4 +389,12 @@ public class RatioBasedCompactionPolicy extends CompactionPolicy {
int numCandidates = storeFiles.size() - filesCompacting.size();
return numCandidates >= comConf.getMinFilesToCompact();
}
/**
* Overwrite min threshold for compaction
* @param minThreshold min to update to
*/
public void setMinThreshold(int minThreshold) {
comConf.setMinFilesToCompact(minThreshold);
}
}

View File

@ -100,6 +100,18 @@ public class MockStoreFile extends StoreFile {
this.entryCount = entryCount;
}
public Long getMinimumTimestamp() {
return (timeRangeTracker == null) ?
null :
timeRangeTracker.getMinimumTimestamp();
}
public Long getMaximumTimestamp() {
return (timeRangeTracker == null) ?
null :
timeRangeTracker.getMaximumTimestamp();
}
@Override
public StoreFile.Reader getReader() {
final long len = this.length;

View File

@ -0,0 +1,207 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy;
import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.experimental.categories.Category;
@Category(SmallTests.class)
public class TestCompactionPolicy {
private final static Log LOG = LogFactory.getLog(TestDefaultCompactSelection.class);
protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
protected Configuration conf;
protected HStore store;
private static final String DIR = TEST_UTIL.getDataTestDir(
TestDefaultCompactSelection.class.getSimpleName()).toString();
protected static Path TEST_FILE;
protected static final int minFiles = 3;
protected static final int maxFiles = 5;
protected static final long minSize = 10;
protected static final long maxSize = 2100;
private FSHLog hlog;
private HRegion region;
@Before
public void setUp() throws Exception {
config();
initialize();
}
/**
* setup config values necessary for store
*/
protected void config() {
this.conf = TEST_UTIL.getConfiguration();
this.conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 0);
this.conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, minFiles);
this.conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, maxFiles);
this.conf.setLong(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_SIZE_KEY, minSize);
this.conf.setLong(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_SIZE_KEY, maxSize);
this.conf.setFloat(CompactionConfiguration.HBASE_HSTORE_COMPACTION_RATIO_KEY, 1.0F);
}
/**
* Setting up a Store
* @throws IOException with error
*/
protected void initialize() throws IOException {
Path basedir = new Path(DIR);
String logName = "logs";
Path logdir = new Path(DIR, logName);
HColumnDescriptor hcd = new HColumnDescriptor(Bytes.toBytes("family"));
FileSystem fs = FileSystem.get(conf);
fs.delete(logdir, true);
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(Bytes.toBytes("table")));
htd.addFamily(hcd);
HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
hlog = new FSHLog(fs, basedir, logName, conf);
region = HRegion.createHRegion(info, basedir, conf, htd, hlog);
region.close();
Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName());
region = new HRegion(tableDir, hlog, fs, conf, info, htd, null);
store = new HStore(region, hcd, conf);
TEST_FILE = region.getRegionFileSystem().createTempName();
fs.createNewFile(TEST_FILE);
}
@After
public void tearDown() throws IOException {
IOException ex = null;
try {
region.close();
} catch (IOException e) {
LOG.warn("Caught Exception", e);
ex = e;
}
try {
hlog.close();
} catch (IOException e) {
LOG.warn("Caught Exception", e);
ex = e;
}
if (ex != null) {
throw ex;
}
}
ArrayList<Long> toArrayList(long... numbers) {
ArrayList<Long> result = new ArrayList<Long>();
for (long i : numbers) {
result.add(i);
}
return result;
}
List<StoreFile> sfCreate(long... sizes) throws IOException {
ArrayList<Long> ageInDisk = new ArrayList<Long>();
for (int i = 0; i < sizes.length; i++) {
ageInDisk.add(0L);
}
return sfCreate(toArrayList(sizes), ageInDisk);
}
List<StoreFile> sfCreate(ArrayList<Long> sizes, ArrayList<Long> ageInDisk) throws IOException {
return sfCreate(false, sizes, ageInDisk);
}
List<StoreFile> sfCreate(boolean isReference, long... sizes) throws IOException {
ArrayList<Long> ageInDisk = new ArrayList<Long>(sizes.length);
for (int i = 0; i < sizes.length; i++) {
ageInDisk.add(0L);
}
return sfCreate(isReference, toArrayList(sizes), ageInDisk);
}
List<StoreFile> sfCreate(boolean isReference, ArrayList<Long> sizes, ArrayList<Long> ageInDisk)
throws IOException {
List<StoreFile> ret = Lists.newArrayList();
for (int i = 0; i < sizes.size(); i++) {
ret.add(new MockStoreFile(TEST_UTIL, TEST_FILE, sizes.get(i), ageInDisk.get(i), isReference,
i));
}
return ret;
}
long[] getSizes(List<StoreFile> sfList) {
long[] aNums = new long[sfList.size()];
for (int i = 0; i < sfList.size(); ++i) {
aNums[i] = sfList.get(i).getReader().length();
}
return aNums;
}
void compactEquals(List<StoreFile> candidates, long... expected) throws IOException {
compactEquals(candidates, false, false, expected);
}
void compactEquals(List<StoreFile> candidates, boolean forcemajor, long... expected)
throws IOException {
compactEquals(candidates, forcemajor, false, expected);
}
void compactEquals(List<StoreFile> candidates, boolean forcemajor, boolean isOffPeak,
long... expected) throws IOException {
store.forceMajor = forcemajor;
// Test Default compactions
CompactionRequest result =
((RatioBasedCompactionPolicy) store.storeEngine.getCompactionPolicy()).selectCompaction(
candidates, new ArrayList<StoreFile>(), false, isOffPeak, forcemajor);
List<StoreFile> actual = new ArrayList<StoreFile>(result.getFiles());
if (isOffPeak && !forcemajor) {
Assert.assertTrue(result.isOffPeak());
}
Assert.assertEquals(Arrays.toString(expected), Arrays.toString(getSizes(actual)));
store.forceMajor = false;
}
}

View File

@ -0,0 +1,211 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionPolicy;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(SmallTests.class)
public class TestDateTieredCompaction extends TestCompactionPolicy {
ArrayList<StoreFile> sfCreate(long[] minTimestamps, long[] maxTimestamps, long[] sizes)
throws IOException {
ArrayList<Long> ageInDisk = new ArrayList<Long>();
for (int i = 0; i < sizes.length; i++) {
ageInDisk.add(0L);
}
ArrayList<StoreFile> ret = Lists.newArrayList();
for (int i = 0; i < sizes.length; i++) {
MockStoreFile msf =
new MockStoreFile(TEST_UTIL, TEST_FILE, sizes[i], ageInDisk.get(i), false, i);
msf.setTimeRangeTracker(new TimeRangeTracker(minTimestamps[i], maxTimestamps[i]));
ret.add(msf);
}
return ret;
}
@Override
protected void config() {
super.config();
// Set up policy
conf.setLong(CompactionConfiguration.MAX_AGE_MILLIS_KEY, 100);
conf.setLong(CompactionConfiguration.INCOMING_WINDOW_MIN_KEY, 3);
conf.setLong(CompactionConfiguration.BASE_WINDOW_MILLIS_KEY, 6);
conf.setInt(CompactionConfiguration.WINDOWS_PER_TIER_KEY, 4);
conf.set(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY,
DateTieredCompactionPolicy.class.getName());
// Special settings for compaction policy per window
this.conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 2);
this.conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 12);
this.conf.setFloat(CompactionConfiguration.HBASE_HSTORE_COMPACTION_RATIO_KEY, 1.2F);
}
void compactEquals(long now, ArrayList<StoreFile> candidates, long... expected)
throws IOException {
Assert.assertTrue(((DateTieredCompactionPolicy) store.storeEngine.getCompactionPolicy())
.needsCompaction(candidates, ImmutableList.<StoreFile> of(), now));
List<StoreFile> actual =
((DateTieredCompactionPolicy) store.storeEngine.getCompactionPolicy())
.applyCompactionPolicy(candidates, false, false, now);
Assert.assertEquals(Arrays.toString(expected), Arrays.toString(getSizes(actual)));
}
/**
* Test for incoming window
* @throws IOException with error
*/
@Test
public void incomingWindow() throws IOException {
long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15 };
long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 23, 24, 25, 10, 11, 12, 13 };
compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), 13, 12, 11, 10);
}
/**
* Not enough files in incoming window
* @throws IOException with error
*/
@Test
public void NotIncomingWindow() throws IOException {
long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13 };
long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 23, 24, 25, 10, 11 };
compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), 25, 24, 23, 22, 21, 20);
}
/**
* Test for file newer than incoming window
* @throws IOException with error
*/
@Test
public void NewerThanIncomingWindow() throws IOException {
long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 18 };
long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 23, 24, 25, 10, 11, 12, 13 };
compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), 13, 12, 11, 10);
}
/**
* If there is no T1 window, we don't build 2
* @throws IOException with error
*/
@Test
public void NoT2() throws IOException {
long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
long[] maxTimestamps = new long[] { 44, 60, 61, 92, 95, 100 };
long[] sizes = new long[] { 0, 20, 21, 22, 23, 1 };
compactEquals(100, sfCreate(minTimestamps, maxTimestamps, sizes), 23, 22);
}
@Test
public void T1() throws IOException {
long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
long[] maxTimestamps = new long[] { 44, 60, 61, 96, 100, 104, 120, 124, 143, 145, 157 };
long[] sizes = new long[] { 0, 50, 51, 40, 41, 42, 30, 31, 32, 2, 1 };
compactEquals(161, sfCreate(minTimestamps, maxTimestamps, sizes), 32, 31, 30);
}
/**
* Apply exploring logic on non-incoming window
* @throws IOException with error
*/
@Test
public void RatioT0() throws IOException {
long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 };
long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 280, 23, 24, 1 };
compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), 22, 21, 20);
}
/**
* Also apply ratio-based logic on t2 window
* @throws IOException with error
*/
@Test
public void RatioT2() throws IOException {
long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
long[] maxTimestamps = new long[] { 44, 60, 61, 96, 100, 104, 120, 124, 143, 145, 157 };
long[] sizes = new long[] { 0, 50, 51, 40, 41, 42, 350, 30, 31, 2, 1 };
compactEquals(161, sfCreate(minTimestamps, maxTimestamps, sizes), 31, 30);
}
/**
* The next compaction call after testTieredCompactionRatioT0 is compacted
* @throws IOException with error
*/
@Test
public void RatioT0Next() throws IOException {
long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 8, 9, 10, 11, 12 };
long[] sizes = new long[] { 30, 31, 32, 33, 34, 22, 280, 23, 24, 1 };
compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), 24, 23);
}
/**
* Older than now(161) - maxAge(100)
* @throws IOException with error
*/
@Test
public void olderThanMaxAge() throws IOException {
long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
long[] maxTimestamps = new long[] { 44, 60, 61, 96, 100, 104, 105, 106, 113, 145, 157 };
long[] sizes = new long[] { 0, 50, 51, 40, 41, 42, 33, 30, 31, 2, 1 };
compactEquals(161, sfCreate(minTimestamps, maxTimestamps, sizes), 31, 30, 33, 42, 41, 40);
}
/**
* Out-of-order data
* @throws IOException with error
*/
@Test
public void OutOfOrder() throws IOException {
long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
long[] maxTimestamps = new long[] { 0, 13, 3, 10, 11, 1, 2, 12, 14, 15 };
long[] sizes = new long[] { 30, 31, 32, 33, 34, 22, 28, 23, 24, 1 };
compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), 1, 24, 23, 28, 22, 34, 33, 32,
31);
}
}

View File

@ -17,187 +17,23 @@
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy;
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.junit.After;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.google.common.collect.Lists;
@Category(SmallTests.class)
public class TestDefaultCompactSelection extends TestCase {
private final static Log LOG = LogFactory.getLog(TestDefaultCompactSelection.class);
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
protected Configuration conf;
protected HStore store;
private static final String DIR=
TEST_UTIL.getDataTestDir(TestDefaultCompactSelection.class.getSimpleName()).toString();
private static Path TEST_FILE;
protected static final int minFiles = 3;
protected static final int maxFiles = 5;
protected static final long minSize = 10;
protected static final long maxSize = 2100;
private WALFactory wals;
private HRegion region;
@Override
public void setUp() throws Exception {
// setup config values necessary for store
this.conf = TEST_UTIL.getConfiguration();
this.conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 0);
this.conf.setInt("hbase.hstore.compaction.min", minFiles);
this.conf.setInt("hbase.hstore.compaction.max", maxFiles);
this.conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, minSize);
this.conf.setLong("hbase.hstore.compaction.max.size", maxSize);
this.conf.setFloat("hbase.hstore.compaction.ratio", 1.0F);
//Setting up a Store
final String id = TestDefaultCompactSelection.class.getName();
Path basedir = new Path(DIR);
final Path logdir = new Path(basedir, DefaultWALProvider.getWALDirectoryName(id));
HColumnDescriptor hcd = new HColumnDescriptor(Bytes.toBytes("family"));
FileSystem fs = FileSystem.get(conf);
fs.delete(logdir, true);
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(Bytes.toBytes("table")));
htd.addFamily(hcd);
HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
final Configuration walConf = new Configuration(conf);
FSUtils.setRootDir(walConf, basedir);
wals = new WALFactory(walConf, null, id);
region = HRegion.createHRegion(info, basedir, conf, htd);
HRegion.closeHRegion(region);
Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName());
region = new HRegion(tableDir, wals.getWAL(info.getEncodedNameAsBytes(), info.getTable()
.getNamespace()), fs, conf, info, htd, null);
store = new HStore(region, hcd, conf);
TEST_FILE = region.getRegionFileSystem().createTempName();
fs.createNewFile(TEST_FILE);
}
@After
public void tearDown() throws IOException {
IOException ex = null;
try {
region.close();
} catch (IOException e) {
LOG.warn("Caught Exception", e);
ex = e;
}
try {
wals.close();
} catch (IOException e) {
LOG.warn("Caught Exception", e);
ex = e;
}
if (ex != null) {
throw ex;
}
}
ArrayList<Long> toArrayList(long... numbers) {
ArrayList<Long> result = new ArrayList<Long>();
for (long i : numbers) {
result.add(i);
}
return result;
}
List<StoreFile> sfCreate(long... sizes) throws IOException {
ArrayList<Long> ageInDisk = new ArrayList<Long>();
for (int i = 0; i < sizes.length; i++) {
ageInDisk.add(0L);
}
return sfCreate(toArrayList(sizes), ageInDisk);
}
List<StoreFile> sfCreate(ArrayList<Long> sizes, ArrayList<Long> ageInDisk)
throws IOException {
return sfCreate(false, sizes, ageInDisk);
}
List<StoreFile> sfCreate(boolean isReference, long... sizes) throws IOException {
ArrayList<Long> ageInDisk = new ArrayList<Long>(sizes.length);
for (int i = 0; i < sizes.length; i++) {
ageInDisk.add(0L);
}
return sfCreate(isReference, toArrayList(sizes), ageInDisk);
}
List<StoreFile> sfCreate(boolean isReference, ArrayList<Long> sizes, ArrayList<Long> ageInDisk)
throws IOException {
List<StoreFile> ret = Lists.newArrayList();
for (int i = 0; i < sizes.size(); i++) {
ret.add(new MockStoreFile(TEST_UTIL, TEST_FILE,
sizes.get(i), ageInDisk.get(i), isReference, i));
}
return ret;
}
long[] getSizes(List<StoreFile> sfList) {
long[] aNums = new long[sfList.size()];
for (int i = 0; i < sfList.size(); ++i) {
aNums[i] = sfList.get(i).getReader().length();
}
return aNums;
}
void compactEquals(List<StoreFile> candidates, long... expected)
throws IOException {
compactEquals(candidates, false, false, expected);
}
void compactEquals(List<StoreFile> candidates, boolean forcemajor, long... expected)
throws IOException {
compactEquals(candidates, forcemajor, false, expected);
}
void compactEquals(List<StoreFile> candidates, boolean forcemajor, boolean isOffPeak,
long ... expected)
throws IOException {
store.forceMajor = forcemajor;
//Test Default compactions
CompactionRequest result = ((RatioBasedCompactionPolicy)store.storeEngine.getCompactionPolicy())
.selectCompaction(candidates, new ArrayList<StoreFile>(), false, isOffPeak, forcemajor);
List<StoreFile> actual = new ArrayList<StoreFile>(result.getFiles());
if (isOffPeak && !forcemajor) {
assertTrue(result.isOffPeak());
}
assertEquals(Arrays.toString(expected), Arrays.toString(getSizes(actual)));
store.forceMajor = false;
}
public class TestDefaultCompactSelection extends TestCompactionPolicy {
@Test
public void testCompactionRatio() throws IOException {
/**
* NOTE: these tests are specific to describe the implementation of the
@ -278,6 +114,7 @@ public class TestDefaultCompactSelection extends TestCase {
compactEquals(sfCreate(tooBig, tooBig) /* empty */);
}
@Test
public void testOffPeakCompactionRatio() throws IOException {
/*
* NOTE: these tests are specific to describe the implementation of the
@ -292,6 +129,7 @@ public class TestDefaultCompactSelection extends TestCase {
compactEquals(sfCreate(999, 50, 12, 12, 1), 12, 12, 1);
}
@Test
public void testStuckStoreCompaction() throws IOException {
// Select the smallest compaction if the store is stuck.
compactEquals(sfCreate(99,99,99,99,99,99, 30,30,30,30), 30, 30, 30);
@ -306,6 +144,7 @@ public class TestDefaultCompactSelection extends TestCase {
compactEquals(sfCreate(99,99,99,99, 27,27,27,20,20,20), 20, 20, 20);
}
@Test
public void testCompactionEmptyHFile() throws IOException {
// Set TTL
ScanInfo oldScanInfo = store.getScanInfo();
@ -327,7 +166,7 @@ public class TestDefaultCompactSelection extends TestCase {
CompactionRequest result = ((RatioBasedCompactionPolicy) store.storeEngine
.getCompactionPolicy()).selectCompaction(candidates,
new ArrayList<StoreFile>(), false, false, false);
assertTrue(result.getFiles().size() == 0);
Assert.assertTrue(result.getFiles().size() == 0);
store.setScanInfo(oldScanInfo);
}
}