HBASE-15368 Add pluggable window support
This commit is contained in:
parent
b6617b4eb9
commit
58f175f0ea
|
@ -70,21 +70,23 @@ public class CompactionConfiguration {
|
|||
/*
|
||||
* The epoch time length for the windows we no longer compact
|
||||
*/
|
||||
public static final String MAX_AGE_MILLIS_KEY =
|
||||
public static final String DATE_TIERED_MAX_AGE_MILLIS_KEY =
|
||||
"hbase.hstore.compaction.date.tiered.max.storefile.age.millis";
|
||||
public static final String BASE_WINDOW_MILLIS_KEY =
|
||||
"hbase.hstore.compaction.date.tiered.base.window.millis";
|
||||
public static final String WINDOWS_PER_TIER_KEY =
|
||||
"hbase.hstore.compaction.date.tiered.windows.per.tier";
|
||||
public static final String INCOMING_WINDOW_MIN_KEY =
|
||||
public static final String DATE_TIERED_INCOMING_WINDOW_MIN_KEY =
|
||||
"hbase.hstore.compaction.date.tiered.incoming.window.min";
|
||||
public static final String COMPACTION_POLICY_CLASS_FOR_TIERED_WINDOWS_KEY =
|
||||
public static final String COMPACTION_POLICY_CLASS_FOR_DATE_TIERED_WINDOWS_KEY =
|
||||
"hbase.hstore.compaction.date.tiered.window.policy.class";
|
||||
public static final String SINGLE_OUTPUT_FOR_MINOR_COMPACTION_KEY =
|
||||
public static final String DATE_TIERED_SINGLE_OUTPUT_FOR_MINOR_COMPACTION_KEY =
|
||||
"hbase.hstore.compaction.date.tiered.single.output.for.minor.compaction";
|
||||
|
||||
private static final Class<? extends RatioBasedCompactionPolicy>
|
||||
DEFAULT_TIER_COMPACTION_POLICY_CLASS = ExploringCompactionPolicy.class;
|
||||
DEFAULT_COMPACTION_POLICY_CLASS_FOR_DATE_TIERED_WINDOWS = ExploringCompactionPolicy.class;
|
||||
|
||||
public static final String DATE_TIERED_COMPACTION_WINDOW_FACTORY_CLASS_KEY =
|
||||
"hbase.hstore.compaction.date.tiered.window.factory.class";
|
||||
|
||||
private static final Class<? extends CompactionWindowFactory>
|
||||
DEFAULT_DATE_TIERED_COMPACTION_WINDOW_FACTORY_CLASS = ExponentialCompactionWindowFactory.class;
|
||||
|
||||
Configuration conf;
|
||||
StoreConfigInformation storeConfigInfo;
|
||||
|
@ -102,12 +104,11 @@ public class CompactionConfiguration {
|
|||
private final long majorCompactionPeriod;
|
||||
private final float majorCompactionJitter;
|
||||
private final float minLocalityToForceCompact;
|
||||
private final long maxStoreFileAgeMillis;
|
||||
private final long baseWindowMillis;
|
||||
private final int windowsPerTier;
|
||||
private final int incomingWindowMin;
|
||||
private final String compactionPolicyForTieredWindow;
|
||||
private final boolean singleOutputForMinorCompaction;
|
||||
private final long dateTieredMaxStoreFileAgeMillis;
|
||||
private final int dateTieredIncomingWindowMin;
|
||||
private final String compactionPolicyForDateTieredWindow;
|
||||
private final boolean dateTieredSingleOutputForMinorCompaction;
|
||||
private final String dateTieredCompactionWindowFactory;
|
||||
|
||||
CompactionConfiguration(Configuration conf, StoreConfigInformation storeConfigInfo) {
|
||||
this.conf = conf;
|
||||
|
@ -131,15 +132,16 @@ public class CompactionConfiguration {
|
|||
majorCompactionJitter = conf.getFloat("hbase.hregion.majorcompaction.jitter", 0.50F);
|
||||
minLocalityToForceCompact = conf.getFloat(HBASE_HSTORE_MIN_LOCALITY_TO_SKIP_MAJOR_COMPACT, 0f);
|
||||
|
||||
maxStoreFileAgeMillis = conf.getLong(MAX_AGE_MILLIS_KEY, Long.MAX_VALUE);
|
||||
baseWindowMillis = conf.getLong(BASE_WINDOW_MILLIS_KEY, 3600000 * 6);
|
||||
windowsPerTier = conf.getInt(WINDOWS_PER_TIER_KEY, 4);
|
||||
incomingWindowMin = conf.getInt(INCOMING_WINDOW_MIN_KEY, 6);
|
||||
compactionPolicyForTieredWindow = conf.get(COMPACTION_POLICY_CLASS_FOR_TIERED_WINDOWS_KEY,
|
||||
DEFAULT_TIER_COMPACTION_POLICY_CLASS.getName());
|
||||
singleOutputForMinorCompaction = conf.getBoolean(SINGLE_OUTPUT_FOR_MINOR_COMPACTION_KEY,
|
||||
true);
|
||||
|
||||
dateTieredMaxStoreFileAgeMillis = conf.getLong(DATE_TIERED_MAX_AGE_MILLIS_KEY, Long.MAX_VALUE);
|
||||
dateTieredIncomingWindowMin = conf.getInt(DATE_TIERED_INCOMING_WINDOW_MIN_KEY, 6);
|
||||
compactionPolicyForDateTieredWindow = conf.get(
|
||||
COMPACTION_POLICY_CLASS_FOR_DATE_TIERED_WINDOWS_KEY,
|
||||
DEFAULT_COMPACTION_POLICY_CLASS_FOR_DATE_TIERED_WINDOWS.getName());
|
||||
dateTieredSingleOutputForMinorCompaction = conf
|
||||
.getBoolean(DATE_TIERED_SINGLE_OUTPUT_FOR_MINOR_COMPACTION_KEY, true);
|
||||
this.dateTieredCompactionWindowFactory = conf.get(
|
||||
DATE_TIERED_COMPACTION_WINDOW_FACTORY_CLASS_KEY,
|
||||
DEFAULT_DATE_TIERED_COMPACTION_WINDOW_FACTORY_CLASS.getName());
|
||||
LOG.info(this);
|
||||
}
|
||||
|
||||
|
@ -148,8 +150,9 @@ public class CompactionConfiguration {
|
|||
return String.format(
|
||||
"size [%d, %d, %d); files [%d, %d); ratio %f; off-peak ratio %f; throttle point %d;"
|
||||
+ " major period %d, major jitter %f, min locality to compact %f;"
|
||||
+ " tiered compaction: max_age %d, base window in milliseconds %d, windows per tier %d,"
|
||||
+ "incoming window min %d",
|
||||
+ " tiered compaction: max_age %d, incoming window min %d,"
|
||||
+ " compaction policy for tiered window %s, single output for minor %b,"
|
||||
+ " compaction window factory %s",
|
||||
minCompactSize,
|
||||
maxCompactSize,
|
||||
offPeakMaxCompactSize,
|
||||
|
@ -161,10 +164,12 @@ public class CompactionConfiguration {
|
|||
majorCompactionPeriod,
|
||||
majorCompactionJitter,
|
||||
minLocalityToForceCompact,
|
||||
maxStoreFileAgeMillis,
|
||||
baseWindowMillis,
|
||||
windowsPerTier,
|
||||
incomingWindowMin);
|
||||
dateTieredMaxStoreFileAgeMillis,
|
||||
dateTieredIncomingWindowMin,
|
||||
compactionPolicyForDateTieredWindow,
|
||||
dateTieredSingleOutputForMinorCompaction,
|
||||
dateTieredCompactionWindowFactory
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -261,27 +266,23 @@ public class CompactionConfiguration {
|
|||
}
|
||||
}
|
||||
|
||||
public long getMaxStoreFileAgeMillis() {
|
||||
return maxStoreFileAgeMillis;
|
||||
public long getDateTieredMaxStoreFileAgeMillis() {
|
||||
return dateTieredMaxStoreFileAgeMillis;
|
||||
}
|
||||
|
||||
public long getBaseWindowMillis() {
|
||||
return baseWindowMillis;
|
||||
public int getDateTieredIncomingWindowMin() {
|
||||
return dateTieredIncomingWindowMin;
|
||||
}
|
||||
|
||||
public int getWindowsPerTier() {
|
||||
return windowsPerTier;
|
||||
public String getCompactionPolicyForDateTieredWindow() {
|
||||
return compactionPolicyForDateTieredWindow;
|
||||
}
|
||||
|
||||
public int getIncomingWindowMin() {
|
||||
return incomingWindowMin;
|
||||
public boolean useDateTieredSingleOutputForMinorCompaction() {
|
||||
return dateTieredSingleOutputForMinorCompaction;
|
||||
}
|
||||
|
||||
public String getCompactionPolicyForTieredWindow() {
|
||||
return compactionPolicyForTieredWindow;
|
||||
}
|
||||
|
||||
public boolean useSingleOutputForMinorCompaction() {
|
||||
return singleOutputForMinorCompaction;
|
||||
public String getDateTieredCompactionWindowFactory() {
|
||||
return dateTieredCompactionWindowFactory;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,57 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver.compactions;
|
||||
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Base class for compaction window implementation.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public abstract class CompactionWindow {
|
||||
|
||||
/**
|
||||
* Compares the window to a timestamp.
|
||||
* @param timestamp the timestamp to compare.
|
||||
* @return a negative integer, zero, or a positive integer as the window lies before, covering, or
|
||||
* after than the timestamp.
|
||||
*/
|
||||
public abstract int compareToTimestamp(long timestamp);
|
||||
|
||||
/**
|
||||
* Move to the new window of the same tier or of the next tier, which represents an earlier time
|
||||
* span.
|
||||
* @return The next earlier window
|
||||
*/
|
||||
public abstract CompactionWindow nextEarlierWindow();
|
||||
|
||||
/**
|
||||
* Inclusive lower bound
|
||||
*/
|
||||
public abstract long startMillis();
|
||||
|
||||
/**
|
||||
* Exclusive upper bound
|
||||
*/
|
||||
public abstract long endMillis();
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "[" + startMillis() + ", " + endMillis() + ")";
|
||||
}
|
||||
}
|
|
@ -0,0 +1,29 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver.compactions;
|
||||
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* For creating compaction window.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public abstract class CompactionWindowFactory {
|
||||
|
||||
public abstract CompactionWindow newIncomingWindow(long now);
|
||||
}
|
|
@ -19,8 +19,6 @@
|
|||
package org.apache.hadoop.hbase.regionserver.compactions;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Iterators;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.PeekingIterator;
|
||||
|
@ -49,33 +47,48 @@ import org.apache.hadoop.hbase.util.ReflectionUtils;
|
|||
/**
|
||||
* HBASE-15181 This is a simple implementation of date-based tiered compaction similar to
|
||||
* Cassandra's for the following benefits:
|
||||
* 1. Improve date-range-based scan by structuring store files in date-based tiered layout.
|
||||
* 2. Reduce compaction overhead.
|
||||
* 3. Improve TTL efficiency.
|
||||
* <ol>
|
||||
* <li>Improve date-range-based scan by structuring store files in date-based tiered layout.</li>
|
||||
* <li>Reduce compaction overhead.</li>
|
||||
* <li>Improve TTL efficiency.</li>
|
||||
* </ol>
|
||||
* Perfect fit for the use cases that:
|
||||
* 1. has mostly date-based data write and scan and a focus on the most recent data.
|
||||
* Out-of-order writes are handled gracefully. Time range overlapping among store files is
|
||||
* tolerated and the performance impact is minimized. Configuration can be set at hbase-site
|
||||
* or overridden at per-table or per-column-family level by hbase shell. Design spec is at
|
||||
* <ol>
|
||||
* <li>has mostly date-based data write and scan and a focus on the most recent data.</li>
|
||||
* </ol>
|
||||
* Out-of-order writes are handled gracefully. Time range overlapping among store files is tolerated
|
||||
* and the performance impact is minimized. Configuration can be set at hbase-site or overridden at
|
||||
* per-table or per-column-family level by hbase shell. Design spec is at
|
||||
* https://docs.google.com/document/d/1_AmlNb2N8Us1xICsTeGDLKIqL6T-oHoRLZ323MG_uy8/
|
||||
*/
|
||||
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
|
||||
public class DateTieredCompactionPolicy extends SortedCompactionPolicy {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(DateTieredCompactionPolicy.class);
|
||||
|
||||
private RatioBasedCompactionPolicy compactionPolicyPerWindow;
|
||||
private final RatioBasedCompactionPolicy compactionPolicyPerWindow;
|
||||
|
||||
private final CompactionWindowFactory windowFactory;
|
||||
|
||||
public DateTieredCompactionPolicy(Configuration conf, StoreConfigInformation storeConfigInfo)
|
||||
throws IOException {
|
||||
super(conf, storeConfigInfo);
|
||||
try {
|
||||
compactionPolicyPerWindow =
|
||||
ReflectionUtils.instantiateWithCustomCtor(comConf.getCompactionPolicyForTieredWindow(),
|
||||
new Class[] { Configuration.class, StoreConfigInformation.class }, new Object[] { conf,
|
||||
storeConfigInfo });
|
||||
compactionPolicyPerWindow = ReflectionUtils.instantiateWithCustomCtor(
|
||||
comConf.getCompactionPolicyForDateTieredWindow(),
|
||||
new Class[] { Configuration.class, StoreConfigInformation.class },
|
||||
new Object[] { conf, storeConfigInfo });
|
||||
} catch (Exception e) {
|
||||
throw new IOException("Unable to load configured compaction policy '"
|
||||
+ comConf.getCompactionPolicyForTieredWindow() + "'", e);
|
||||
+ comConf.getCompactionPolicyForDateTieredWindow() + "'", e);
|
||||
}
|
||||
try {
|
||||
windowFactory = ReflectionUtils.instantiateWithCustomCtor(
|
||||
comConf.getDateTieredCompactionWindowFactory(),
|
||||
new Class[] { CompactionConfiguration.class }, new Object[] { comConf });
|
||||
} catch (Exception e) {
|
||||
throw new IOException("Unable to load configured window factory '"
|
||||
+ comConf.getDateTieredCompactionWindowFactory() + "'", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -88,7 +101,7 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy {
|
|||
final List<StoreFile> filesCompacting) {
|
||||
ArrayList<StoreFile> candidates = new ArrayList<StoreFile>(storeFiles);
|
||||
try {
|
||||
return selectMinorCompaction(candidates, false, true) != null;
|
||||
return !selectMinorCompaction(candidates, false, true).getFiles().isEmpty();
|
||||
} catch (Exception e) {
|
||||
LOG.error("Can not check for compaction: ", e);
|
||||
return false;
|
||||
|
@ -111,8 +124,7 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy {
|
|||
|
||||
long cfTTL = this.storeConfigInfo.getStoreFileTtl();
|
||||
HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
|
||||
long oldestToCompact = getOldestToCompact(comConf.getMaxStoreFileAgeMillis(), now);
|
||||
List<Long> boundaries = getCompactBoundariesForMajor(filesToCompact, oldestToCompact, now);
|
||||
List<Long> boundaries = getCompactBoundariesForMajor(filesToCompact, now);
|
||||
boolean[] filesInWindow = new boolean[boundaries.size()];
|
||||
|
||||
for (StoreFile file: filesToCompact) {
|
||||
|
@ -167,15 +179,16 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy {
|
|||
boolean tryingMajor, boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
|
||||
CompactionRequest result = tryingMajor ? selectMajorCompaction(candidateSelection)
|
||||
: selectMinorCompaction(candidateSelection, mayUseOffPeak, mayBeStuck);
|
||||
LOG.debug("Generated compaction request: " + result);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Generated compaction request: " + result);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
public CompactionRequest selectMajorCompaction(ArrayList<StoreFile> candidateSelection) {
|
||||
long now = EnvironmentEdgeManager.currentTime();
|
||||
long oldestToCompact = getOldestToCompact(comConf.getMaxStoreFileAgeMillis(), now);
|
||||
return new DateTieredCompactionRequest(candidateSelection,
|
||||
this.getCompactBoundariesForMajor(candidateSelection, oldestToCompact, now));
|
||||
this.getCompactBoundariesForMajor(candidateSelection, now));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -189,15 +202,12 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy {
|
|||
public CompactionRequest selectMinorCompaction(ArrayList<StoreFile> candidateSelection,
|
||||
boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
|
||||
long now = EnvironmentEdgeManager.currentTime();
|
||||
long oldestToCompact = getOldestToCompact(comConf.getMaxStoreFileAgeMillis(), now);
|
||||
|
||||
List<StoreFile> storeFileList = Lists.newArrayList(filterOldStoreFiles(candidateSelection,
|
||||
oldestToCompact));
|
||||
long oldestToCompact = getOldestToCompact(comConf.getDateTieredMaxStoreFileAgeMillis(), now);
|
||||
|
||||
List<Pair<StoreFile, Long>> storefileMaxTimestampPairs =
|
||||
Lists.newArrayListWithCapacity(Iterables.size(storeFileList));
|
||||
Lists.newArrayListWithCapacity(candidateSelection.size());
|
||||
long maxTimestampSeen = Long.MIN_VALUE;
|
||||
for (StoreFile storeFile : storeFileList) {
|
||||
for (StoreFile storeFile : candidateSelection) {
|
||||
// if there is out-of-order data,
|
||||
// we put them in the same window as the last file in increasing order
|
||||
maxTimestampSeen = Math.max(maxTimestampSeen,
|
||||
|
@ -206,16 +216,18 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy {
|
|||
}
|
||||
Collections.reverse(storefileMaxTimestampPairs);
|
||||
|
||||
Window window = getIncomingWindow(now, comConf.getBaseWindowMillis());
|
||||
int minThreshold = comConf.getIncomingWindowMin();
|
||||
CompactionWindow window = getIncomingWindow(now);
|
||||
int minThreshold = comConf.getDateTieredIncomingWindowMin();
|
||||
PeekingIterator<Pair<StoreFile, Long>> it =
|
||||
Iterators.peekingIterator(storefileMaxTimestampPairs.iterator());
|
||||
while (it.hasNext()) {
|
||||
if (window.compareToTimestamp(oldestToCompact) < 0) {
|
||||
break;
|
||||
}
|
||||
int compResult = window.compareToTimestamp(it.peek().getSecond());
|
||||
if (compResult > 0) {
|
||||
// If the file is too old for the window, switch to the next window
|
||||
window = window.nextWindow(comConf.getWindowsPerTier(),
|
||||
oldestToCompact);
|
||||
window = window.nextEarlierWindow();
|
||||
minThreshold = comConf.getMinFilesToCompact();
|
||||
} else {
|
||||
// The file is within the target window
|
||||
|
@ -226,7 +238,9 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy {
|
|||
fileList.add(it.next().getFirst());
|
||||
}
|
||||
if (fileList.size() >= minThreshold) {
|
||||
LOG.debug("Processing files: " + fileList + " for window: " + window);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Processing files: " + fileList + " for window: " + window);
|
||||
}
|
||||
DateTieredCompactionRequest request = generateCompactionRequest(fileList, window,
|
||||
mayUseOffPeak, mayBeStuck, minThreshold);
|
||||
if (request != null) {
|
||||
|
@ -240,8 +254,8 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy {
|
|||
}
|
||||
|
||||
private DateTieredCompactionRequest generateCompactionRequest(ArrayList<StoreFile> storeFiles,
|
||||
Window window, boolean mayUseOffPeak, boolean mayBeStuck, int minThreshold)
|
||||
throws IOException {
|
||||
CompactionWindow window, boolean mayUseOffPeak, boolean mayBeStuck, int minThreshold)
|
||||
throws IOException {
|
||||
// The files has to be in ascending order for ratio-based compaction to work right
|
||||
// and removeExcessFile to exclude youngest files.
|
||||
Collections.reverse(storeFiles);
|
||||
|
@ -254,7 +268,7 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy {
|
|||
// If there is any file in the window excluded from compaction,
|
||||
// only one file will be output from compaction.
|
||||
boolean singleOutput = storeFiles.size() != storeFileSelection.size() ||
|
||||
comConf.useSingleOutputForMinorCompaction();
|
||||
comConf.useDateTieredSingleOutputForMinorCompaction();
|
||||
List<Long> boundaries = getCompactionBoundariesForMinor(window, singleOutput);
|
||||
DateTieredCompactionRequest result = new DateTieredCompactionRequest(storeFileSelection,
|
||||
boundaries);
|
||||
|
@ -267,20 +281,20 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy {
|
|||
* Return a list of boundaries for multiple compaction output
|
||||
* in ascending order.
|
||||
*/
|
||||
private List<Long> getCompactBoundariesForMajor(Collection<StoreFile> filesToCompact,
|
||||
long oldestToCompact, long now) {
|
||||
private List<Long> getCompactBoundariesForMajor(Collection<StoreFile> filesToCompact, long now) {
|
||||
long minTimestamp = Long.MAX_VALUE;
|
||||
for (StoreFile file : filesToCompact) {
|
||||
minTimestamp = Math.min(minTimestamp,
|
||||
file.getMinimumTimestamp() == null? Long.MAX_VALUE : file.getMinimumTimestamp());
|
||||
minTimestamp =
|
||||
Math.min(minTimestamp,
|
||||
file.getMinimumTimestamp() == null ? Long.MAX_VALUE : file.getMinimumTimestamp());
|
||||
}
|
||||
|
||||
List<Long> boundaries = new ArrayList<Long>();
|
||||
|
||||
// Add startMillis of all windows between now and min timestamp
|
||||
for (Window window = getIncomingWindow(now, comConf.getBaseWindowMillis());
|
||||
window.compareToTimestamp(minTimestamp) > 0;
|
||||
window = window.nextWindow(comConf.getWindowsPerTier(), oldestToCompact)) {
|
||||
for (CompactionWindow window = getIncomingWindow(now);
|
||||
window.compareToTimestamp(minTimestamp) > 0;
|
||||
window = window.nextEarlierWindow()) {
|
||||
boundaries.add(window.startMillis());
|
||||
}
|
||||
boundaries.add(Long.MIN_VALUE);
|
||||
|
@ -289,10 +303,10 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy {
|
|||
}
|
||||
|
||||
/**
|
||||
* @return a list of boundaries for multiple compaction output
|
||||
* from minTimestamp to maxTimestamp.
|
||||
* @return a list of boundaries for multiple compaction output from minTimestamp to maxTimestamp.
|
||||
*/
|
||||
private static List<Long> getCompactionBoundariesForMinor(Window window, boolean singleOutput) {
|
||||
private static List<Long> getCompactionBoundariesForMinor(CompactionWindow window,
|
||||
boolean singleOutput) {
|
||||
List<Long> boundaries = new ArrayList<Long>();
|
||||
boundaries.add(Long.MIN_VALUE);
|
||||
if (!singleOutput) {
|
||||
|
@ -301,122 +315,17 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy {
|
|||
return boundaries;
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes all store files with max timestamp older than (current - maxAge).
|
||||
* @param storeFiles all store files to consider
|
||||
* @param maxAge the age in milliseconds when a store file stops participating in compaction.
|
||||
* @return a list of storeFiles with the store file older than maxAge excluded
|
||||
*/
|
||||
private static Iterable<StoreFile> filterOldStoreFiles(List<StoreFile> storeFiles,
|
||||
final long cutoff) {
|
||||
return Iterables.filter(storeFiles, new Predicate<StoreFile>() {
|
||||
@Override
|
||||
public boolean apply(StoreFile storeFile) {
|
||||
// Known findbugs issue to guava. SuppressWarning or Nonnull annotation don't work.
|
||||
if (storeFile == null) {
|
||||
return false;
|
||||
}
|
||||
Long maxTimestamp = storeFile.getMaximumTimestamp();
|
||||
return maxTimestamp == null ? true : maxTimestamp >= cutoff;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private static Window getIncomingWindow(long now, long baseWindowMillis) {
|
||||
return new Window(baseWindowMillis, now / baseWindowMillis);
|
||||
private CompactionWindow getIncomingWindow(long now) {
|
||||
return windowFactory.newIncomingWindow(now);
|
||||
}
|
||||
|
||||
private static long getOldestToCompact(long maxAgeMillis, long now) {
|
||||
try {
|
||||
return LongMath.checkedSubtract(now, maxAgeMillis);
|
||||
} catch (ArithmeticException ae) {
|
||||
LOG.warn("Value for " + CompactionConfiguration.MAX_AGE_MILLIS_KEY + ": " + maxAgeMillis
|
||||
+ ". All the files will be eligible for minor compaction.");
|
||||
LOG.warn("Value for " + CompactionConfiguration.DATE_TIERED_MAX_AGE_MILLIS_KEY + ": "
|
||||
+ maxAgeMillis + ". All the files will be eligible for minor compaction.");
|
||||
return Long.MIN_VALUE;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This is the class we use to partition from epoch time to now into tiers of exponential sizes of
|
||||
* windows.
|
||||
*/
|
||||
private static final class Window {
|
||||
/**
|
||||
* How big a range of timestamps fit inside the window in milliseconds.
|
||||
*/
|
||||
private final long windowMillis;
|
||||
|
||||
/**
|
||||
* A timestamp t is within the window iff t / size == divPosition.
|
||||
*/
|
||||
private final long divPosition;
|
||||
|
||||
private Window(long baseWindowMillis, long divPosition) {
|
||||
windowMillis = baseWindowMillis;
|
||||
this.divPosition = divPosition;
|
||||
}
|
||||
|
||||
/**
|
||||
* Compares the window to a timestamp.
|
||||
* @param timestamp the timestamp to compare.
|
||||
* @return a negative integer, zero, or a positive integer as the window lies before, covering,
|
||||
* or after than the timestamp.
|
||||
*/
|
||||
public int compareToTimestamp(long timestamp) {
|
||||
if (timestamp < 0) {
|
||||
try {
|
||||
timestamp = LongMath.checkedSubtract(timestamp, windowMillis - 1);
|
||||
} catch (ArithmeticException ae) {
|
||||
timestamp = Long.MIN_VALUE;
|
||||
}
|
||||
}
|
||||
long pos = timestamp / windowMillis;
|
||||
return divPosition == pos ? 0 : divPosition < pos ? -1 : 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Move to the new window of the same tier or of the next tier, which represents an earlier time
|
||||
* span.
|
||||
* @param windowsPerTier The number of contiguous windows that will have the same size. Windows
|
||||
* following those will be <code>tierBase</code> times as big.
|
||||
* @return The next window
|
||||
*/
|
||||
public Window nextWindow(int windowsPerTier, long oldestToCompact) {
|
||||
// Don't promote to the next tier if there is not even 1 window at current tier
|
||||
// or if the next window crosses the max age.
|
||||
if (divPosition % windowsPerTier > 0 ||
|
||||
startMillis() - windowMillis * windowsPerTier < oldestToCompact) {
|
||||
return new Window(windowMillis, divPosition - 1);
|
||||
} else {
|
||||
return new Window(windowMillis * windowsPerTier, divPosition / windowsPerTier - 1);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Inclusive lower bound
|
||||
*/
|
||||
public long startMillis() {
|
||||
try {
|
||||
return LongMath.checkedMultiply(windowMillis, divPosition);
|
||||
} catch (ArithmeticException ae) {
|
||||
return Long.MIN_VALUE;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Exclusive upper bound
|
||||
*/
|
||||
public long endMillis() {
|
||||
try {
|
||||
return LongMath.checkedMultiply(windowMillis, (divPosition + 1));
|
||||
} catch (ArithmeticException ae) {
|
||||
return Long.MAX_VALUE;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "[" + startMillis() + ", " + endMillis() + ")";
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,146 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver.compactions;
|
||||
|
||||
import com.google.common.math.LongMath;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Exponential compaction window implementation.
|
||||
*/
|
||||
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
|
||||
public class ExponentialCompactionWindowFactory extends CompactionWindowFactory {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(ExponentialCompactionWindowFactory.class);
|
||||
|
||||
public static final String BASE_WINDOW_MILLIS_KEY =
|
||||
"hbase.hstore.compaction.date.tiered.base.window.millis";
|
||||
public static final String WINDOWS_PER_TIER_KEY =
|
||||
"hbase.hstore.compaction.date.tiered.windows.per.tier";
|
||||
public static final String MAX_TIER_AGE_MILLIS_KEY =
|
||||
"hbase.hstore.compaction.date.tiered.max.tier.age.millis";
|
||||
|
||||
private final class Window extends CompactionWindow {
|
||||
|
||||
/**
|
||||
* Will not promote to next tier for window before it.
|
||||
*/
|
||||
private final long maxTierAgeCutoff;
|
||||
|
||||
/**
|
||||
* How big a range of timestamps fit inside the window in milliseconds.
|
||||
*/
|
||||
private final long windowMillis;
|
||||
|
||||
/**
|
||||
* A timestamp t is within the window iff t / size == divPosition.
|
||||
*/
|
||||
private final long divPosition;
|
||||
|
||||
public Window(long baseWindowMillis, long divPosition, long maxTierAgeCutoff) {
|
||||
this.windowMillis = baseWindowMillis;
|
||||
this.divPosition = divPosition;
|
||||
this.maxTierAgeCutoff = maxTierAgeCutoff;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareToTimestamp(long timestamp) {
|
||||
if (timestamp < 0) {
|
||||
try {
|
||||
timestamp = LongMath.checkedSubtract(timestamp, windowMillis - 1);
|
||||
} catch (ArithmeticException ae) {
|
||||
timestamp = Long.MIN_VALUE;
|
||||
}
|
||||
}
|
||||
long pos = timestamp / windowMillis;
|
||||
return divPosition == pos ? 0 : divPosition < pos ? -1 : 1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Window nextEarlierWindow() {
|
||||
// Don't promote to the next tier if there is not even 1 window at current tier
|
||||
// or if the next window crosses the max age.
|
||||
if (divPosition % windowsPerTier > 0
|
||||
|| startMillis() - windowMillis * windowsPerTier < maxTierAgeCutoff) {
|
||||
return new Window(windowMillis, divPosition - 1, maxTierAgeCutoff);
|
||||
} else {
|
||||
return new Window(windowMillis * windowsPerTier, divPosition / windowsPerTier - 1,
|
||||
maxTierAgeCutoff);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long startMillis() {
|
||||
try {
|
||||
return LongMath.checkedMultiply(windowMillis, divPosition);
|
||||
} catch (ArithmeticException ae) {
|
||||
return Long.MIN_VALUE;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long endMillis() {
|
||||
try {
|
||||
return LongMath.checkedMultiply(windowMillis, (divPosition + 1));
|
||||
} catch (ArithmeticException ae) {
|
||||
return Long.MAX_VALUE;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private final long baseWindowMillis;
|
||||
private final int windowsPerTier;
|
||||
private final long maxTierAgeMillis;
|
||||
|
||||
private long getMaxTierAgeCutoff(long now) {
|
||||
try {
|
||||
return LongMath.checkedSubtract(now, maxTierAgeMillis);
|
||||
} catch (ArithmeticException ae) {
|
||||
LOG.warn("Value for " + MAX_TIER_AGE_MILLIS_KEY + ": " + maxTierAgeMillis
|
||||
+ ". Will always promote to next tier.");
|
||||
return Long.MIN_VALUE;
|
||||
}
|
||||
}
|
||||
|
||||
public ExponentialCompactionWindowFactory(CompactionConfiguration comConf) {
|
||||
Configuration conf = comConf.conf;
|
||||
baseWindowMillis = conf.getLong(BASE_WINDOW_MILLIS_KEY, 3600000 * 6);
|
||||
windowsPerTier = conf.getInt(WINDOWS_PER_TIER_KEY, 4);
|
||||
maxTierAgeMillis = conf.getLong(MAX_TIER_AGE_MILLIS_KEY,
|
||||
comConf.getDateTieredMaxStoreFileAgeMillis());
|
||||
LOG.info(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompactionWindow newIncomingWindow(long now) {
|
||||
return new Window(baseWindowMillis, now / baseWindowMillis, getMaxTierAgeCutoff(now));
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.format(
|
||||
"%s [base window in milliseconds %d, windows per tier %d, max tier age in milliseconds %d]",
|
||||
getClass().getSimpleName(), baseWindowMillis, windowsPerTier, maxTierAgeMillis);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,81 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionPolicy;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionRequest;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
public class AbstractTestDateTieredCompactionPolicy extends TestCompactionPolicy {
|
||||
|
||||
protected ArrayList<StoreFile> sfCreate(long[] minTimestamps, long[] maxTimestamps, long[] sizes)
|
||||
throws IOException {
|
||||
ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge();
|
||||
EnvironmentEdgeManager.injectEdge(timeMachine);
|
||||
// Has to be > 0 and < now.
|
||||
timeMachine.setValue(1);
|
||||
ArrayList<Long> ageInDisk = new ArrayList<Long>();
|
||||
for (int i = 0; i < sizes.length; i++) {
|
||||
ageInDisk.add(0L);
|
||||
}
|
||||
|
||||
ArrayList<StoreFile> ret = Lists.newArrayList();
|
||||
for (int i = 0; i < sizes.length; i++) {
|
||||
MockStoreFile msf =
|
||||
new MockStoreFile(TEST_UTIL, TEST_FILE, sizes[i], ageInDisk.get(i), false, i);
|
||||
msf.setTimeRangeTracker(new TimeRangeTracker(minTimestamps[i], maxTimestamps[i]));
|
||||
ret.add(msf);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
protected void compactEquals(long now, ArrayList<StoreFile> candidates, long[] expectedFileSizes,
|
||||
long[] expectedBoundaries, boolean isMajor, boolean toCompact) throws IOException {
|
||||
ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge();
|
||||
EnvironmentEdgeManager.injectEdge(timeMachine);
|
||||
timeMachine.setValue(now);
|
||||
DateTieredCompactionRequest request;
|
||||
DateTieredCompactionPolicy policy =
|
||||
(DateTieredCompactionPolicy) store.storeEngine.getCompactionPolicy();
|
||||
if (isMajor) {
|
||||
for (StoreFile file : candidates) {
|
||||
((MockStoreFile) file).setIsMajor(true);
|
||||
}
|
||||
assertEquals(toCompact, policy.shouldPerformMajorCompaction(candidates));
|
||||
request = (DateTieredCompactionRequest) policy.selectMajorCompaction(candidates);
|
||||
} else {
|
||||
assertEquals(toCompact, policy.needsCompaction(candidates, ImmutableList.<StoreFile> of()));
|
||||
request =
|
||||
(DateTieredCompactionRequest) policy.selectMinorCompaction(candidates, false, false);
|
||||
}
|
||||
List<StoreFile> actual = Lists.newArrayList(request.getFiles());
|
||||
assertEquals(Arrays.toString(expectedFileSizes), Arrays.toString(getSizes(actual)));
|
||||
assertEquals(Arrays.toString(expectedBoundaries),
|
||||
Arrays.toString(request.getBoundaries().toArray()));
|
||||
}
|
||||
}
|
|
@ -17,47 +17,18 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionPolicy;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionRequest;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.ExponentialCompactionWindowFactory;
|
||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category(SmallTests.class)
|
||||
public class TestDateTieredCompactionPolicy extends TestCompactionPolicy {
|
||||
ArrayList<StoreFile> sfCreate(long[] minTimestamps, long[] maxTimestamps, long[] sizes)
|
||||
throws IOException {
|
||||
ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge();
|
||||
EnvironmentEdgeManager.injectEdge(timeMachine);
|
||||
// Has to be > 0 and < now.
|
||||
timeMachine.setValue(1);
|
||||
ArrayList<Long> ageInDisk = new ArrayList<Long>();
|
||||
for (int i = 0; i < sizes.length; i++) {
|
||||
ageInDisk.add(0L);
|
||||
}
|
||||
|
||||
ArrayList<StoreFile> ret = Lists.newArrayList();
|
||||
for (int i = 0; i < sizes.length; i++) {
|
||||
MockStoreFile msf =
|
||||
new MockStoreFile(TEST_UTIL, TEST_FILE, sizes[i], ageInDisk.get(i), false, i);
|
||||
msf.setTimeRangeTracker(new TimeRangeTracker(minTimestamps[i], maxTimestamps[i]));
|
||||
ret.add(msf);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@Category({ RegionServerTests.class, SmallTests.class })
|
||||
public class TestDateTieredCompactionPolicy extends AbstractTestDateTieredCompactionPolicy {
|
||||
|
||||
@Override
|
||||
protected void config() {
|
||||
|
@ -66,11 +37,12 @@ public class TestDateTieredCompactionPolicy extends TestCompactionPolicy {
|
|||
// Set up policy
|
||||
conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY,
|
||||
"org.apache.hadoop.hbase.regionserver.DateTieredStoreEngine");
|
||||
conf.setLong(CompactionConfiguration.MAX_AGE_MILLIS_KEY, 100);
|
||||
conf.setLong(CompactionConfiguration.INCOMING_WINDOW_MIN_KEY, 3);
|
||||
conf.setLong(CompactionConfiguration.BASE_WINDOW_MILLIS_KEY, 6);
|
||||
conf.setInt(CompactionConfiguration.WINDOWS_PER_TIER_KEY, 4);
|
||||
conf.setBoolean(CompactionConfiguration.SINGLE_OUTPUT_FOR_MINOR_COMPACTION_KEY, false);
|
||||
conf.setLong(CompactionConfiguration.DATE_TIERED_MAX_AGE_MILLIS_KEY, 100);
|
||||
conf.setLong(CompactionConfiguration.DATE_TIERED_INCOMING_WINDOW_MIN_KEY, 3);
|
||||
conf.setLong(ExponentialCompactionWindowFactory.BASE_WINDOW_MILLIS_KEY, 6);
|
||||
conf.setInt(ExponentialCompactionWindowFactory.WINDOWS_PER_TIER_KEY, 4);
|
||||
conf.setBoolean(CompactionConfiguration.DATE_TIERED_SINGLE_OUTPUT_FOR_MINOR_COMPACTION_KEY,
|
||||
false);
|
||||
|
||||
// Special settings for compaction policy per window
|
||||
this.conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 2);
|
||||
|
@ -81,32 +53,6 @@ public class TestDateTieredCompactionPolicy extends TestCompactionPolicy {
|
|||
conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 10);
|
||||
}
|
||||
|
||||
void compactEquals(long now, ArrayList<StoreFile> candidates, long[] expectedFileSizes,
|
||||
long[] expectedBoundaries, boolean isMajor, boolean toCompact) throws IOException {
|
||||
ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge();
|
||||
EnvironmentEdgeManager.injectEdge(timeMachine);
|
||||
timeMachine.setValue(now);
|
||||
DateTieredCompactionRequest request;
|
||||
if (isMajor) {
|
||||
for (StoreFile file : candidates) {
|
||||
((MockStoreFile)file).setIsMajor(true);
|
||||
}
|
||||
Assert.assertEquals(toCompact, ((DateTieredCompactionPolicy) store.storeEngine.getCompactionPolicy())
|
||||
.shouldPerformMajorCompaction(candidates));
|
||||
request = (DateTieredCompactionRequest) ((DateTieredCompactionPolicy) store.storeEngine
|
||||
.getCompactionPolicy()).selectMajorCompaction(candidates);
|
||||
} else {
|
||||
Assert.assertEquals(toCompact, ((DateTieredCompactionPolicy) store.storeEngine.getCompactionPolicy())
|
||||
.needsCompaction(candidates, ImmutableList.<StoreFile> of()));
|
||||
request = (DateTieredCompactionRequest) ((DateTieredCompactionPolicy) store.storeEngine
|
||||
.getCompactionPolicy()).selectMinorCompaction(candidates, false, false);
|
||||
}
|
||||
List<StoreFile> actual = Lists.newArrayList(request.getFiles());
|
||||
Assert.assertEquals(Arrays.toString(expectedFileSizes), Arrays.toString(getSizes(actual)));
|
||||
Assert.assertEquals(Arrays.toString(expectedBoundaries),
|
||||
Arrays.toString(request.getBoundaries().toArray()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test for incoming window
|
||||
* @throws IOException with error
|
||||
|
@ -283,8 +229,9 @@ public class TestDateTieredCompactionPolicy extends TestCompactionPolicy {
|
|||
long[] maxTimestamps = new long[] { 44, 60, 61, 96, 100, 104, 105, 106, 113, 145, 157 };
|
||||
long[] sizes = new long[] { 0, 50, 51, 40, 41, 42, 33, 30, 31, 2, 1 };
|
||||
|
||||
compactEquals(161, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 0, 50, 51, 40,41, 42,
|
||||
33, 30, 31, 2, 1 }, new long[] { Long.MIN_VALUE, 24, 48, 72, 96, 120, 144, 150, 156 }, true, true);
|
||||
compactEquals(161, sfCreate(minTimestamps, maxTimestamps, sizes),
|
||||
new long[] { 0, 50, 51, 40, 41, 42, 33, 30, 31, 2, 1 },
|
||||
new long[] { Long.MIN_VALUE, 24, 48, 72, 96, 120, 144, 150, 156 }, true, true);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -302,24 +249,4 @@ public class TestDateTieredCompactionPolicy extends TestCompactionPolicy {
|
|||
41, 42, 33, 30, 31, 2, 1 },
|
||||
new long[] { Long.MIN_VALUE, -144, -120, -96, -72, -48, -24, 0, 6, 12 }, true, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Major compaction with maximum values
|
||||
* @throws IOException with error
|
||||
*/
|
||||
@Test
|
||||
public void maxValuesForMajor() throws IOException {
|
||||
conf.setLong(CompactionConfiguration.BASE_WINDOW_MILLIS_KEY, Long.MAX_VALUE / 2);
|
||||
conf.setInt(CompactionConfiguration.WINDOWS_PER_TIER_KEY, 2);
|
||||
store.storeEngine.getCompactionPolicy().setConf(conf);
|
||||
long[] minTimestamps =
|
||||
new long[] { Long.MIN_VALUE, -100 };
|
||||
long[] maxTimestamps = new long[] { -8, Long.MAX_VALUE };
|
||||
long[] sizes = new long[] { 0, 1 };
|
||||
|
||||
compactEquals(Long.MAX_VALUE, sfCreate(minTimestamps, maxTimestamps, sizes),
|
||||
new long[] { 0, 1 },
|
||||
new long[] { Long.MIN_VALUE, -4611686018427387903L, 0, 4611686018427387903L,
|
||||
9223372036854775806L }, true, true);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,70 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.ExponentialCompactionWindowFactory;
|
||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category({ RegionServerTests.class, SmallTests.class })
|
||||
public class TestDateTieredCompactionPolicyOverflow extends AbstractTestDateTieredCompactionPolicy {
|
||||
|
||||
@Override
|
||||
protected void config() {
|
||||
super.config();
|
||||
|
||||
// Set up policy
|
||||
conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY,
|
||||
"org.apache.hadoop.hbase.regionserver.DateTieredStoreEngine");
|
||||
conf.setLong(CompactionConfiguration.DATE_TIERED_MAX_AGE_MILLIS_KEY, 100);
|
||||
conf.setLong(CompactionConfiguration.DATE_TIERED_INCOMING_WINDOW_MIN_KEY, 3);
|
||||
conf.setLong(ExponentialCompactionWindowFactory.BASE_WINDOW_MILLIS_KEY, Long.MAX_VALUE / 2);
|
||||
conf.setInt(ExponentialCompactionWindowFactory.WINDOWS_PER_TIER_KEY, 2);
|
||||
conf.setBoolean(CompactionConfiguration.DATE_TIERED_SINGLE_OUTPUT_FOR_MINOR_COMPACTION_KEY,
|
||||
false);
|
||||
|
||||
// Special settings for compaction policy per window
|
||||
this.conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 2);
|
||||
this.conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 12);
|
||||
this.conf.setFloat(CompactionConfiguration.HBASE_HSTORE_COMPACTION_RATIO_KEY, 1.2F);
|
||||
|
||||
conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 20);
|
||||
conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 10);
|
||||
}
|
||||
|
||||
/**
|
||||
* Major compaction with maximum values
|
||||
* @throws IOException with error
|
||||
*/
|
||||
@Test
|
||||
public void maxValuesForMajor() throws IOException {
|
||||
long[] minTimestamps = new long[] { Long.MIN_VALUE, -100 };
|
||||
long[] maxTimestamps = new long[] { -8, Long.MAX_VALUE };
|
||||
long[] sizes = new long[] { 0, 1 };
|
||||
|
||||
compactEquals(Long.MAX_VALUE, sfCreate(minTimestamps, maxTimestamps, sizes),
|
||||
new long[] { 0, 1 }, new long[] { Long.MIN_VALUE, -4611686018427387903L, 0,
|
||||
4611686018427387903L, 9223372036854775806L }, true, true);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue