HBASE-7680 implement compaction policy for stripe compactions
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1536569 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
19fd1a3310
commit
9023d46908
|
@ -53,6 +53,12 @@ public class DefaultStoreEngine extends StoreEngine<
|
|||
private static final Class<? extends RatioBasedCompactionPolicy>
|
||||
DEFAULT_COMPACTION_POLICY_CLASS = ExploringCompactionPolicy.class;
|
||||
|
||||
@Override
|
||||
public boolean needsCompaction(List<StoreFile> filesCompacting) {
|
||||
return compactionPolicy.needsCompaction(
|
||||
this.storeFileManager.getStorefiles(), filesCompacting);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void createComponents(
|
||||
Configuration conf, Store store, KVComparator kvComparator) throws IOException {
|
||||
|
|
|
@ -71,6 +71,8 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor
|
|||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -1194,8 +1196,8 @@ public class HStore implements Store {
|
|||
|
||||
try {
|
||||
// Ready to go. Have list of files to compact.
|
||||
List<Path> newFiles =
|
||||
this.storeEngine.getCompactor().compactForTesting(filesToCompact, isMajor);
|
||||
List<Path> newFiles = ((DefaultCompactor)this.storeEngine.getCompactor())
|
||||
.compactForTesting(filesToCompact, isMajor);
|
||||
for (Path newFile: newFiles) {
|
||||
// Move the compaction into place.
|
||||
StoreFile sf = moveFileIntoPlace(newFile);
|
||||
|
@ -1881,8 +1883,7 @@ public class HStore implements Store {
|
|||
|
||||
@Override
|
||||
public boolean needsCompaction() {
|
||||
return storeEngine.getCompactionPolicy().needsCompaction(
|
||||
this.storeEngine.getStoreFileManager().getStorefiles(), filesCompacting);
|
||||
return this.storeEngine.needsCompaction(this.filesCompacting);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -31,7 +31,6 @@ import org.apache.hadoop.classification.InterfaceStability;
|
|||
@InterfaceStability.Unstable
|
||||
public interface StoreConfigInformation {
|
||||
/**
|
||||
* TODO: remove after HBASE-7252 is fixed.
|
||||
* @return Gets the Memstore flush size for the region that this store works with.
|
||||
*/
|
||||
long getMemstoreFlushSize();
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -79,6 +80,12 @@ public abstract class StoreEngine<SF extends StoreFlusher,
|
|||
return this.storeFlusher;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param filesCompacting Files currently compacting
|
||||
* @return whether a compaction selection is possible
|
||||
*/
|
||||
public abstract boolean needsCompaction(List<StoreFile> filesCompacting);
|
||||
|
||||
/**
|
||||
* Creates an instance of a compaction context specific to this engine.
|
||||
* Doesn't actually select or start a compaction. See CompactionContext class comment.
|
||||
|
|
|
@ -285,7 +285,7 @@ public class StoreFile {
|
|||
return modificationTimeStamp;
|
||||
}
|
||||
|
||||
byte[] getMetadataValue(byte[] key) {
|
||||
public byte[] getMetadataValue(byte[] key) {
|
||||
return metadataMap.get(key);
|
||||
}
|
||||
|
||||
|
|
|
@ -18,6 +18,8 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
||||
|
@ -28,25 +30,116 @@ import org.apache.hadoop.conf.Configuration;
|
|||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class StripeStoreConfig {
|
||||
public static final String MAX_SPLIT_IMBALANCE = "hbase.store.stripe.split.max.imbalance";
|
||||
private float maxSplitImbalance;
|
||||
static final Log LOG = LogFactory.getLog(StripeStoreConfig.class);
|
||||
|
||||
public StripeStoreConfig(Configuration config) {
|
||||
maxSplitImbalance = config.getFloat(MAX_SPLIT_IMBALANCE, 1.5f);
|
||||
if (maxSplitImbalance == 0) {
|
||||
maxSplitImbalance = 1.5f;
|
||||
/** The maximum number of files to compact within a stripe; same as for regular compaction. */
|
||||
public static final String MAX_FILES_KEY = "hbase.store.stripe.compaction.maxFiles";
|
||||
/** The minimum number of files to compact within a stripe; same as for regular compaction. */
|
||||
public static final String MIN_FILES_KEY = "hbase.store.stripe.compaction.minFiles";
|
||||
|
||||
/** The minimum number of files to compact when compacting L0; same as minFiles for regular
|
||||
* compaction. Given that L0 causes unnecessary overwriting of the data, should be higher than
|
||||
* regular minFiles. */
|
||||
public static final String MIN_FILES_L0_KEY = "hbase.store.stripe.compaction.minFilesL0";
|
||||
|
||||
/** The size the stripe should achieve to be considered for splitting into multiple stripes.
|
||||
Stripe will be split when it can be fully compacted, and it is above this size. */
|
||||
public static final String SIZE_TO_SPLIT_KEY = "hbase.store.stripe.sizeToSplit";
|
||||
/** The target count of new stripes to produce when splitting a stripe. A floating point
|
||||
number, default is 2. Values less than 1 will be converted to 1/x. Non-whole numbers will
|
||||
produce unbalanced splits, which may be good for some cases. In this case the "smaller" of
|
||||
the new stripes will always be the rightmost one. If the stripe is bigger than sizeToSplit
|
||||
when splitting, this will be adjusted by a whole increment. */
|
||||
public static final String SPLIT_PARTS_KEY = "hbase.store.stripe.splitPartCount";
|
||||
/** The initial stripe count to create. If the row distribution is roughly the same over time,
|
||||
it's good to set this to a count of stripes that is expected to be achieved in most regions,
|
||||
to get this count from the outset and prevent unnecessary splitting. */
|
||||
public static final String INITIAL_STRIPE_COUNT_KEY = "hbase.store.stripe.initialStripeCount";
|
||||
|
||||
/** When splitting region, the maximum size imbalance to allow in an attempt to split at a
|
||||
stripe boundary, so that no files go to both regions. Most users won't need to change that. */
|
||||
public static final String MAX_REGION_SPLIT_IMBALANCE_KEY =
|
||||
"hbase.store.stripe.region.split.max.imbalance";
|
||||
|
||||
|
||||
private final float maxRegionSplitImbalance;
|
||||
private final int level0CompactMinFiles;
|
||||
private final int stripeCompactMinFiles;
|
||||
private final int stripeCompactMaxFiles;
|
||||
|
||||
private final int initialCount;
|
||||
private final long sizeToSplitAt;
|
||||
private final float splitPartCount;
|
||||
private final long splitPartSize; // derived from sizeToSplitAt and splitPartCount
|
||||
|
||||
private final double EPSILON = 0.001; // good enough for this, not a real epsilon.
|
||||
public StripeStoreConfig(Configuration config, StoreConfigInformation sci) {
|
||||
this.level0CompactMinFiles = config.getInt(MIN_FILES_L0_KEY, 4);
|
||||
this.stripeCompactMinFiles = config.getInt(MIN_FILES_KEY, 3);
|
||||
this.stripeCompactMaxFiles = config.getInt(MAX_FILES_KEY, 10);
|
||||
this.maxRegionSplitImbalance = getFloat(config, MAX_REGION_SPLIT_IMBALANCE_KEY, 1.5f, true);
|
||||
|
||||
this.splitPartCount = getFloat(config, SPLIT_PARTS_KEY, 2.0f, true);
|
||||
if (Math.abs(splitPartCount - 1.0) < EPSILON) {
|
||||
throw new RuntimeException("Split part count cannot be 1: " + this.splitPartCount);
|
||||
}
|
||||
if (maxSplitImbalance < 1f) {
|
||||
maxSplitImbalance = 1f / maxSplitImbalance;
|
||||
// TODO: change when no L0.
|
||||
// Arbitrary default split size - 4 times the size of one L0 compaction.
|
||||
double flushSize = sci.getMemstoreFlushSize();
|
||||
if (flushSize == 0) {
|
||||
flushSize = 128 * 1024 * 1024;
|
||||
}
|
||||
long defaultSplitSize = (long)(flushSize * getLevel0MinFiles() * 4 * splitPartCount);
|
||||
this.sizeToSplitAt = config.getLong(SIZE_TO_SPLIT_KEY, defaultSplitSize);
|
||||
this.initialCount = config.getInt(INITIAL_STRIPE_COUNT_KEY, 1);
|
||||
this.splitPartSize = (long)(this.sizeToSplitAt / this.splitPartCount);
|
||||
}
|
||||
|
||||
private static float getFloat(
|
||||
Configuration config, String key, float defaultValue, boolean moreThanOne) {
|
||||
float value = config.getFloat(key, defaultValue);
|
||||
if (value == 0) {
|
||||
LOG.warn(String.format("%s is set to 0; using default value of %f", key, defaultValue));
|
||||
value = defaultValue;
|
||||
} else if ((value > 1f) != moreThanOne) {
|
||||
value = 1f / value;
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
public float getMaxSplitImbalance() {
|
||||
return this.maxRegionSplitImbalance;
|
||||
}
|
||||
|
||||
public int getLevel0MinFiles() {
|
||||
return level0CompactMinFiles;
|
||||
}
|
||||
|
||||
public int getStripeCompactMinFiles() {
|
||||
return stripeCompactMinFiles;
|
||||
}
|
||||
|
||||
public int getStripeCompactMaxFiles() {
|
||||
return stripeCompactMaxFiles;
|
||||
}
|
||||
|
||||
public long getSplitSize() {
|
||||
return sizeToSplitAt;
|
||||
}
|
||||
|
||||
public int getInitialCount() {
|
||||
return initialCount;
|
||||
}
|
||||
|
||||
public float getSplitCount() {
|
||||
return splitPartCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the maximum imbalance to tolerate between sides when splitting the region
|
||||
* at the stripe boundary. If the ratio of a larger to a smaller side of the split on
|
||||
* the stripe-boundary is bigger than this, then some stripe will be split.
|
||||
* @return the desired size of the target stripe when splitting, in bytes.
|
||||
* Derived from {@link #getSplitSize()} and {@link #getSplitCount()}.
|
||||
*/
|
||||
public float getMaxSplitImbalance() {
|
||||
return this.maxSplitImbalance;
|
||||
public long getSplitPartSize() {
|
||||
return splitPartSize;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,104 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.KeyValue.KVComparator;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
/**
|
||||
* The storage engine that implements the stripe-based store/compaction scheme.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class StripeStoreEngine extends StoreEngine<DefaultStoreFlusher,
|
||||
StripeCompactionPolicy, StripeCompactor, StripeStoreFileManager> {
|
||||
static final Log LOG = LogFactory.getLog(StripeStoreEngine.class);
|
||||
private StripeStoreConfig config;
|
||||
|
||||
@Override
|
||||
public boolean needsCompaction(List<StoreFile> filesCompacting) {
|
||||
return this.compactionPolicy.needsCompactions(this.storeFileManager, filesCompacting);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompactionContext createCompaction() {
|
||||
return new StripeCompaction();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void createComponents(
|
||||
Configuration conf, Store store, KVComparator comparator) throws IOException {
|
||||
this.config = new StripeStoreConfig(conf, store);
|
||||
this.compactionPolicy = new StripeCompactionPolicy(conf, store, config);
|
||||
this.storeFlusher = new DefaultStoreFlusher(conf, store);
|
||||
this.storeFileManager = new StripeStoreFileManager(comparator, conf, this.config);
|
||||
this.compactor = new StripeCompactor(conf, store);
|
||||
}
|
||||
|
||||
/**
|
||||
* Represents one instance of stripe compaction, with the necessary context and flow.
|
||||
*/
|
||||
private class StripeCompaction extends CompactionContext {
|
||||
private StripeCompactionPolicy.StripeCompactionRequest stripeRequest = null;
|
||||
|
||||
@Override
|
||||
public List<StoreFile> preSelect(List<StoreFile> filesCompacting) {
|
||||
return compactionPolicy.preSelectFilesForCoprocessor(storeFileManager, filesCompacting);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean select(List<StoreFile> filesCompacting, boolean isUserCompaction,
|
||||
boolean mayUseOffPeak, boolean forceMajor) throws IOException {
|
||||
this.stripeRequest = compactionPolicy.selectCompaction(
|
||||
storeFileManager, filesCompacting, mayUseOffPeak);
|
||||
this.request = (this.stripeRequest == null)
|
||||
? new CompactionRequest(new ArrayList<StoreFile>()) : this.stripeRequest.getRequest();
|
||||
return this.stripeRequest != null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void forceSelect(CompactionRequest request) {
|
||||
super.forceSelect(request);
|
||||
if (this.stripeRequest != null) {
|
||||
this.stripeRequest.setRequest(this.request);
|
||||
} else {
|
||||
LOG.warn("Stripe store is forced to take an arbitrary file list and compact it.");
|
||||
this.stripeRequest = compactionPolicy.createEmptyRequest(storeFileManager, this.request);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Path> compact() throws IOException {
|
||||
Preconditions.checkArgument(this.stripeRequest != null, "Cannot compact without selection");
|
||||
return this.stripeRequest.execute(compactor);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValue.KVComparator;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ConcatenatedLists;
|
||||
|
||||
|
@ -58,7 +59,8 @@ import com.google.common.collect.ImmutableList;
|
|||
* - Compaction has one contiguous set of stripes both in and out, except if L0 is involved.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class StripeStoreFileManager implements StoreFileManager {
|
||||
public class StripeStoreFileManager
|
||||
implements StoreFileManager, StripeCompactionPolicy.StripeInformationProvider {
|
||||
static final Log LOG = LogFactory.getLog(StripeStoreFileManager.class);
|
||||
|
||||
/**
|
||||
|
@ -81,16 +83,16 @@ class StripeStoreFileManager implements StoreFileManager {
|
|||
*/
|
||||
private static class State {
|
||||
/**
|
||||
* The end keys of each stripe. The last stripe end is always open-ended, so it's not stored
|
||||
* here. It is invariant that the start key of the stripe is the end key of the previous one
|
||||
* The end rows of each stripe. The last stripe end is always open-ended, so it's not stored
|
||||
* here. It is invariant that the start row of the stripe is the end row of the previous one
|
||||
* (and is an open boundary for the first one).
|
||||
*/
|
||||
public byte[][] stripeEndRows = new byte[0][];
|
||||
|
||||
/**
|
||||
* Files by stripe. Each element of the list corresponds to stripeEndKey with the corresponding
|
||||
* index, except the last one. Inside each list, the files are in reverse order by seqNum.
|
||||
* Note that the length of this is one higher than that of stripeEndKeys.
|
||||
* Files by stripe. Each element of the list corresponds to stripeEndRow element with the
|
||||
* same index, except the last one. Inside each list, the files are in reverse order by
|
||||
* seqNum. Note that the length of this is one higher than that of stripeEndKeys.
|
||||
*/
|
||||
public ArrayList<ImmutableList<StoreFile>> stripeFiles
|
||||
= new ArrayList<ImmutableList<StoreFile>>();
|
||||
|
@ -105,16 +107,20 @@ class StripeStoreFileManager implements StoreFileManager {
|
|||
/** Cached file metadata (or overrides as the case may be) */
|
||||
private HashMap<StoreFile, byte[]> fileStarts = new HashMap<StoreFile, byte[]>();
|
||||
private HashMap<StoreFile, byte[]> fileEnds = new HashMap<StoreFile, byte[]>();
|
||||
/** Normally invalid key is null, but in the map null is the result for "no key"; so use
|
||||
* the following constant value in these maps instead. Note that this is a constant and
|
||||
* we use it to compare by reference when we read from the map. */
|
||||
private static final byte[] INVALID_KEY_IN_MAP = new byte[0];
|
||||
|
||||
private final KVComparator kvComparator;
|
||||
private StripeStoreConfig config;
|
||||
|
||||
private final int blockingFileCount;
|
||||
|
||||
public StripeStoreFileManager(KVComparator kvComparator, Configuration conf) throws Exception {
|
||||
public StripeStoreFileManager(
|
||||
KVComparator kvComparator, Configuration conf, StripeStoreConfig config) {
|
||||
this.kvComparator = kvComparator;
|
||||
// TODO: create this in a shared manner in StoreEngine when there's one
|
||||
this.config = new StripeStoreConfig(conf);
|
||||
this.config = config;
|
||||
this.blockingFileCount = conf.getInt(
|
||||
HStore.BLOCKING_STOREFILES_KEY, HStore.DEFAULT_BLOCKING_STOREFILE_COUNT);
|
||||
}
|
||||
|
@ -378,7 +384,7 @@ class StripeStoreFileManager implements StoreFileManager {
|
|||
while (entryIter.hasNext()) {
|
||||
Map.Entry<byte[], ArrayList<StoreFile>> entry = entryIter.next();
|
||||
ArrayList<StoreFile> files = entry.getValue();
|
||||
// Validate the file start keys, and remove the bad ones to level 0.
|
||||
// Validate the file start rows, and remove the bad ones to level 0.
|
||||
for (int i = 0; i < files.size(); ++i) {
|
||||
StoreFile sf = files.get(i);
|
||||
byte[] startRow = startOf(sf);
|
||||
|
@ -459,15 +465,8 @@ class StripeStoreFileManager implements StoreFileManager {
|
|||
}
|
||||
|
||||
private void ensureLevel0Metadata(StoreFile sf) {
|
||||
if (!isInvalid(startOf(sf))) this.fileStarts.put(sf, null);
|
||||
if (!isInvalid(endOf(sf))) this.fileEnds.put(sf, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* For testing.
|
||||
*/
|
||||
List<StoreFile> getLevel0Files() {
|
||||
return state.level0Files;
|
||||
if (!isInvalid(startOf(sf))) this.fileStarts.put(sf, INVALID_KEY_IN_MAP);
|
||||
if (!isInvalid(endOf(sf))) this.fileEnds.put(sf, INVALID_KEY_IN_MAP);
|
||||
}
|
||||
|
||||
private void debugDumpState(String string) {
|
||||
|
@ -515,7 +514,7 @@ class StripeStoreFileManager implements StoreFileManager {
|
|||
}
|
||||
|
||||
/**
|
||||
* Finds the stripe index by end key.
|
||||
* Finds the stripe index by end row.
|
||||
*/
|
||||
private final int findStripeIndexByEndRow(byte[] endRow) {
|
||||
assert !isInvalid(endRow);
|
||||
|
@ -524,7 +523,7 @@ class StripeStoreFileManager implements StoreFileManager {
|
|||
}
|
||||
|
||||
/**
|
||||
* Finds the stripe index for the stripe containing a key provided externally for get/scan.
|
||||
* Finds the stripe index for the stripe containing a row provided externally for get/scan.
|
||||
*/
|
||||
private final int findStripeForRow(byte[] row, boolean isStart) {
|
||||
if (isStart && row == HConstants.EMPTY_START_ROW) return 0;
|
||||
|
@ -537,33 +536,28 @@ class StripeStoreFileManager implements StoreFileManager {
|
|||
return Math.abs(Arrays.binarySearch(state.stripeEndRows, row, Bytes.BYTES_COMPARATOR) + 1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the start key for a given stripe.
|
||||
* @param stripeIndex Stripe index.
|
||||
* @return Start key. May be an open key.
|
||||
*/
|
||||
@Override
|
||||
public final byte[] getStartRow(int stripeIndex) {
|
||||
return (stripeIndex == 0 ? OPEN_KEY : state.stripeEndRows[stripeIndex - 1]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the start key for a given stripe.
|
||||
* @param stripeIndex Stripe index.
|
||||
* @return Start key. May be an open key.
|
||||
*/
|
||||
@Override
|
||||
public final byte[] getEndRow(int stripeIndex) {
|
||||
return (stripeIndex == state.stripeEndRows.length
|
||||
? OPEN_KEY : state.stripeEndRows[stripeIndex]);
|
||||
}
|
||||
|
||||
|
||||
private byte[] startOf(StoreFile sf) {
|
||||
byte[] result = this.fileStarts.get(sf);
|
||||
return result != null ? result : sf.getMetadataValue(STRIPE_START_KEY);
|
||||
return result == null ? sf.getMetadataValue(STRIPE_START_KEY)
|
||||
: (result == INVALID_KEY_IN_MAP ? INVALID_KEY : result);
|
||||
}
|
||||
|
||||
private byte[] endOf(StoreFile sf) {
|
||||
byte[] result = this.fileEnds.get(sf);
|
||||
return result != null ? result : sf.getMetadataValue(STRIPE_END_KEY);
|
||||
return result == null ? sf.getMetadataValue(STRIPE_END_KEY)
|
||||
: (result == INVALID_KEY_IN_MAP ? INVALID_KEY : result);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -793,8 +787,8 @@ class StripeStoreFileManager implements StoreFileManager {
|
|||
|
||||
/**
|
||||
* See {@link #addCompactionResults(Collection, Collection)} - updates the stripe list with
|
||||
* new candidate stripes/removes old stripes; produces new set of stripe end keys.
|
||||
* @param newStripes New stripes - files by end key.
|
||||
* new candidate stripes/removes old stripes; produces new set of stripe end rows.
|
||||
* @param newStripes New stripes - files by end row.
|
||||
*/
|
||||
private void processNewCandidateStripes(
|
||||
TreeMap<byte[], StoreFile> newStripes) throws IOException {
|
||||
|
@ -859,4 +853,31 @@ class StripeStoreFileManager implements StoreFileManager {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<StoreFile> getLevel0Files() {
|
||||
return this.state.level0Files;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<byte[]> getStripeBoundaries() {
|
||||
if (this.state.stripeFiles.isEmpty()) return new ArrayList<byte[]>();
|
||||
ArrayList<byte[]> result = new ArrayList<byte[]>(this.state.stripeEndRows.length + 2);
|
||||
result.add(OPEN_KEY);
|
||||
for (int i = 0; i < this.state.stripeEndRows.length; ++i) {
|
||||
result.add(this.state.stripeEndRows[i]);
|
||||
}
|
||||
result.add(OPEN_KEY);
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ArrayList<ImmutableList<StoreFile>> getStripes() {
|
||||
return this.state.stripeFiles;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getStripeCount() {
|
||||
return this.state.stripeFiles.size();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -47,6 +47,7 @@ public class CompactionConfiguration {
|
|||
static final Log LOG = LogFactory.getLog(CompactionConfiguration.class);
|
||||
|
||||
private static final String CONFIG_PREFIX = "hbase.hstore.compaction.";
|
||||
public static final String RATIO_KEY = CONFIG_PREFIX + "ratio";
|
||||
|
||||
Configuration conf;
|
||||
StoreConfigInformation storeConfigInfo;
|
||||
|
@ -72,7 +73,7 @@ public class CompactionConfiguration {
|
|||
minFilesToCompact = Math.max(2, conf.getInt(CONFIG_PREFIX + "min",
|
||||
/*old name*/ conf.getInt("hbase.hstore.compactionThreshold", 3)));
|
||||
maxFilesToCompact = conf.getInt(CONFIG_PREFIX + "max", 10);
|
||||
compactionRatio = conf.getFloat(CONFIG_PREFIX + "ratio", 1.2F);
|
||||
compactionRatio = conf.getFloat(RATIO_KEY, 1.2F);
|
||||
offPeekCompactionRatio = conf.getFloat(CONFIG_PREFIX + "ratio.offpeak", 5.0F);
|
||||
|
||||
throttlePoint = conf.getLong("hbase.regionserver.thread.compaction.throttle",
|
||||
|
|
|
@ -55,14 +55,6 @@ public abstract class CompactionPolicy {
|
|||
*/
|
||||
public abstract boolean throttleCompaction(long compactionSize);
|
||||
|
||||
/**
|
||||
* @param storeFiles Current store files.
|
||||
* @param filesCompacting files currently compacting.
|
||||
* @return whether a compactionSelection is possible
|
||||
*/
|
||||
public abstract boolean needsCompaction(final Collection<StoreFile> storeFiles,
|
||||
final List<StoreFile> filesCompacting);
|
||||
|
||||
/**
|
||||
* Inform the policy that some configuration has been change,
|
||||
* so cached value should be updated it any.
|
||||
|
|
|
@ -79,32 +79,6 @@ public abstract class Compactor {
|
|||
void append(KeyValue kv) throws IOException;
|
||||
}
|
||||
|
||||
/**
|
||||
* Do a minor/major compaction on an explicit set of storefiles from a Store.
|
||||
* @param request the requested compaction
|
||||
* @return Product of compaction or an empty list if all cells expired or deleted and nothing made
|
||||
* it through the compaction.
|
||||
* @throws IOException
|
||||
*/
|
||||
public abstract List<Path> compact(final CompactionRequest request) throws IOException;
|
||||
|
||||
/**
|
||||
* Compact a list of files for testing. Creates a fake {@link CompactionRequest} to pass to
|
||||
* {@link #compact(CompactionRequest)};
|
||||
* @param filesToCompact the files to compact. These are used as the compactionSelection for the
|
||||
* generated {@link CompactionRequest}.
|
||||
* @param isMajor true to major compact (prune all deletes, max versions, etc)
|
||||
* @return Product of compaction or an empty list if all cells expired or deleted and nothing made
|
||||
* it through the compaction.
|
||||
* @throws IOException
|
||||
*/
|
||||
public List<Path> compactForTesting(final Collection<StoreFile> filesToCompact, boolean isMajor)
|
||||
throws IOException {
|
||||
CompactionRequest cr = new CompactionRequest(filesToCompact);
|
||||
cr.setIsMajor(isMajor);
|
||||
return this.compact(cr);
|
||||
}
|
||||
|
||||
public CompactionProgress getProgress() {
|
||||
return this.progress;
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver.compactions;
|
|||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
@ -94,4 +95,21 @@ public class DefaultCompactor extends Compactor {
|
|||
}
|
||||
return newFiles;
|
||||
}
|
||||
|
||||
/**
|
||||
* Compact a list of files for testing. Creates a fake {@link CompactionRequest} to pass to
|
||||
* {@link #compact(CompactionRequest)};
|
||||
* @param filesToCompact the files to compact. These are used as the compactionSelection for
|
||||
* the generated {@link CompactionRequest}.
|
||||
* @param isMajor true to major compact (prune all deletes, max versions, etc)
|
||||
* @return Product of compaction or an empty list if all cells expired or deleted and nothing \
|
||||
* made it through the compaction.
|
||||
* @throws IOException
|
||||
*/
|
||||
public List<Path> compactForTesting(final Collection<StoreFile> filesToCompact, boolean isMajor)
|
||||
throws IOException {
|
||||
CompactionRequest cr = new CompactionRequest(filesToCompact);
|
||||
cr.setIsMajor(isMajor);
|
||||
return this.compact(cr);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -53,9 +53,19 @@ public class ExploringCompactionPolicy extends RatioBasedCompactionPolicy {
|
|||
@Override
|
||||
final ArrayList<StoreFile> applyCompactionPolicy(final ArrayList<StoreFile> candidates,
|
||||
final boolean mayUseOffPeak, final boolean mightBeStuck) throws IOException {
|
||||
return new ArrayList<StoreFile>(applyCompactionPolicy(candidates, mightBeStuck,
|
||||
mayUseOffPeak, comConf.getMinFilesToCompact(), comConf.getMaxFilesToCompact()));
|
||||
}
|
||||
|
||||
public List<StoreFile> applyCompactionPolicy(final List<StoreFile> candidates,
|
||||
boolean mightBeStuck, boolean mayUseOffPeak, int minFiles, int maxFiles) {
|
||||
|
||||
final double currentRatio = mayUseOffPeak
|
||||
? comConf.getCompactionRatioOffPeak() : comConf.getCompactionRatio();
|
||||
|
||||
// Start off choosing nothing.
|
||||
List<StoreFile> bestSelection = new ArrayList<StoreFile>(0);
|
||||
List<StoreFile> smallest = new ArrayList<StoreFile>(0);
|
||||
List<StoreFile> smallest = mightBeStuck ? new ArrayList<StoreFile>(0) : null;
|
||||
long bestSize = 0;
|
||||
long smallestSize = Long.MAX_VALUE;
|
||||
|
||||
|
@ -63,15 +73,15 @@ public class ExploringCompactionPolicy extends RatioBasedCompactionPolicy {
|
|||
// Consider every starting place.
|
||||
for (int start = 0; start < candidates.size(); start++) {
|
||||
// Consider every different sub list permutation in between start and end with min files.
|
||||
for (int currentEnd = start + comConf.getMinFilesToCompact() - 1;
|
||||
for (int currentEnd = start + minFiles - 1;
|
||||
currentEnd < candidates.size(); currentEnd++) {
|
||||
List<StoreFile> potentialMatchFiles = candidates.subList(start, currentEnd + 1);
|
||||
|
||||
// Sanity checks
|
||||
if (potentialMatchFiles.size() < comConf.getMinFilesToCompact()) {
|
||||
if (potentialMatchFiles.size() < minFiles) {
|
||||
continue;
|
||||
}
|
||||
if (potentialMatchFiles.size() > comConf.getMaxFilesToCompact()) {
|
||||
if (potentialMatchFiles.size() > maxFiles) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -81,7 +91,7 @@ public class ExploringCompactionPolicy extends RatioBasedCompactionPolicy {
|
|||
|
||||
// Store the smallest set of files. This stored set of files will be used
|
||||
// if it looks like the algorithm is stuck.
|
||||
if (size < smallestSize) {
|
||||
if (mightBeStuck && size < smallestSize) {
|
||||
smallest = potentialMatchFiles;
|
||||
smallestSize = size;
|
||||
}
|
||||
|
@ -92,7 +102,7 @@ public class ExploringCompactionPolicy extends RatioBasedCompactionPolicy {
|
|||
|
||||
++opts;
|
||||
if (size >= comConf.getMinCompactSize()
|
||||
&& !filesInRatio(potentialMatchFiles, mayUseOffPeak)) {
|
||||
&& !filesInRatio(potentialMatchFiles, currentRatio)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -150,15 +160,13 @@ public class ExploringCompactionPolicy extends RatioBasedCompactionPolicy {
|
|||
* FileSize(i) <= ( Sum(0,N,FileSize(_)) - FileSize(i) ) * Ratio.
|
||||
*
|
||||
* @param files List of store files to consider as a compaction candidate.
|
||||
* @param isOffPeak should the offPeak compaction ratio be used ?
|
||||
* @param currentRatio The ratio to use.
|
||||
* @return a boolean if these files satisfy the ratio constraints.
|
||||
*/
|
||||
private boolean filesInRatio(final List<StoreFile> files, final boolean isOffPeak) {
|
||||
private boolean filesInRatio(final List<StoreFile> files, final double currentRatio) {
|
||||
if (files.size() < 2) {
|
||||
return true;
|
||||
return true;
|
||||
}
|
||||
final double currentRatio =
|
||||
isOffPeak ? comConf.getCompactionRatioOffPeak() : comConf.getCompactionRatio();
|
||||
|
||||
long totalFileSize = getTotalStoreSize(files);
|
||||
|
||||
|
|
|
@ -388,7 +388,6 @@ public class RatioBasedCompactionPolicy extends CompactionPolicy {
|
|||
return compactionSize > comConf.getThrottlePoint();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean needsCompaction(final Collection<StoreFile> storeFiles,
|
||||
final List<StoreFile> filesCompacting) {
|
||||
int numCandidates = storeFiles.size() - filesCompacting.size();
|
||||
|
|
|
@ -0,0 +1,563 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver.compactions;
|
||||
|
||||
import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreUtils;
|
||||
import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ConcatenatedLists;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
|
||||
/**
|
||||
* Stripe store implementation of compaction policy.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class StripeCompactionPolicy extends CompactionPolicy {
|
||||
private final static Log LOG = LogFactory.getLog(StripeCompactionPolicy.class);
|
||||
// Policy used to compact individual stripes.
|
||||
private ExploringCompactionPolicy stripePolicy = null;
|
||||
|
||||
private StripeStoreConfig config;
|
||||
|
||||
public StripeCompactionPolicy(
|
||||
Configuration conf, StoreConfigInformation storeConfigInfo, StripeStoreConfig config) {
|
||||
super(conf, storeConfigInfo);
|
||||
this.config = config;
|
||||
stripePolicy = new ExploringCompactionPolicy(conf, storeConfigInfo);
|
||||
}
|
||||
|
||||
public List<StoreFile> preSelectFilesForCoprocessor(StripeInformationProvider si,
|
||||
List<StoreFile> filesCompacting) {
|
||||
// We sincerely hope nobody is messing with us with their coprocessors.
|
||||
// If they do, they are very likely to shoot themselves in the foot.
|
||||
// We'll just exclude all the filesCompacting from the list.
|
||||
ArrayList<StoreFile> candidateFiles = new ArrayList<StoreFile>(si.getStorefiles());
|
||||
candidateFiles.removeAll(filesCompacting);
|
||||
return candidateFiles;
|
||||
}
|
||||
|
||||
public StripeCompactionRequest createEmptyRequest(
|
||||
StripeInformationProvider si, CompactionRequest request) {
|
||||
// Treat as L0-ish compaction with fixed set of files, and hope for the best.
|
||||
if (si.getStripeCount() > 0) {
|
||||
return new BoundaryStripeCompactionRequest(request, si.getStripeBoundaries());
|
||||
}
|
||||
int initialCount = this.config.getInitialCount();
|
||||
long targetKvs = estimateTargetKvs(request.getFiles(), initialCount).getFirst();
|
||||
return new SplitStripeCompactionRequest(request, OPEN_KEY, OPEN_KEY, targetKvs);
|
||||
}
|
||||
|
||||
public StripeCompactionRequest selectCompaction(StripeInformationProvider si,
|
||||
List<StoreFile> filesCompacting, boolean isOffpeak) throws IOException {
|
||||
// TODO: first cut - no parallel compactions. To have more fine grained control we
|
||||
// probably need structure more sophisticated than a list.
|
||||
if (!filesCompacting.isEmpty()) {
|
||||
LOG.debug("Not selecting compaction: " + filesCompacting.size() + " files compacting");
|
||||
return null;
|
||||
}
|
||||
|
||||
// We are going to do variations of compaction in strict order of preference.
|
||||
// A better/more advanced approach is to use a heuristic to see which one is "more
|
||||
// necessary" at current time.
|
||||
|
||||
// This can happen due to region split. We can skip it later; for now preserve
|
||||
// compact-all-things behavior.
|
||||
Collection<StoreFile> allFiles = si.getStorefiles();
|
||||
if (StoreUtils.hasReferences(allFiles)) {
|
||||
LOG.debug("There are references in the store; compacting all files");
|
||||
long targetKvs = estimateTargetKvs(allFiles, config.getSplitCount()).getFirst();
|
||||
SplitStripeCompactionRequest request = new SplitStripeCompactionRequest(
|
||||
allFiles, OPEN_KEY, OPEN_KEY, targetKvs);
|
||||
request.setMajorRangeFull();
|
||||
return request;
|
||||
}
|
||||
|
||||
int stripeCount = si.getStripeCount();
|
||||
List<StoreFile> l0Files = si.getLevel0Files();
|
||||
|
||||
// See if we need to make new stripes.
|
||||
boolean shouldCompactL0 = (this.config.getLevel0MinFiles() <= l0Files.size());
|
||||
if (stripeCount == 0) {
|
||||
if (!shouldCompactL0) return null; // nothing to do.
|
||||
return selectNewStripesCompaction(si);
|
||||
}
|
||||
|
||||
boolean canDropDeletesNoL0 = l0Files.size() == 0;
|
||||
if (shouldCompactL0) {
|
||||
if (!canDropDeletesNoL0) {
|
||||
// If we need to compact L0, see if we can add something to it, and drop deletes.
|
||||
StripeCompactionRequest result = selectSingleStripeCompaction(
|
||||
si, true, canDropDeletesNoL0, isOffpeak);
|
||||
if (result != null) return result;
|
||||
}
|
||||
LOG.debug("Selecting L0 compaction with " + l0Files.size() + " files");
|
||||
return new BoundaryStripeCompactionRequest(l0Files, si.getStripeBoundaries());
|
||||
}
|
||||
|
||||
// Try to delete fully expired stripes
|
||||
StripeCompactionRequest result = selectExpiredMergeCompaction(si, canDropDeletesNoL0);
|
||||
if (result != null) return result;
|
||||
|
||||
// Ok, nothing special here, let's see if we need to do a common compaction.
|
||||
// This will also split the stripes that are too big if needed.
|
||||
return selectSingleStripeCompaction(si, false, canDropDeletesNoL0, isOffpeak);
|
||||
}
|
||||
|
||||
public boolean needsCompactions(StripeInformationProvider si, List<StoreFile> filesCompacting) {
|
||||
// Approximation on whether we need compaction.
|
||||
return filesCompacting.isEmpty()
|
||||
&& (StoreUtils.hasReferences(si.getStorefiles())
|
||||
|| (si.getLevel0Files().size() >= this.config.getLevel0MinFiles())
|
||||
|| needsSingleStripeCompaction(si));
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isMajorCompaction(Collection<StoreFile> filesToCompact) throws IOException {
|
||||
return false; // there's never a major compaction!
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean throttleCompaction(long compactionSize) {
|
||||
return compactionSize > comConf.getThrottlePoint();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param si StoreFileManager.
|
||||
* @return Whether any stripe potentially needs compaction.
|
||||
*/
|
||||
protected boolean needsSingleStripeCompaction(StripeInformationProvider si) {
|
||||
int minFiles = this.config.getStripeCompactMinFiles();
|
||||
for (List<StoreFile> stripe : si.getStripes()) {
|
||||
if (stripe.size() >= minFiles) return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
protected StripeCompactionRequest selectSingleStripeCompaction(StripeInformationProvider si,
|
||||
boolean includeL0, boolean canDropDeletesWithoutL0, boolean isOffpeak) throws IOException {
|
||||
ArrayList<ImmutableList<StoreFile>> stripes = si.getStripes();
|
||||
|
||||
int bqIndex = -1;
|
||||
List<StoreFile> bqSelection = null;
|
||||
int stripeCount = stripes.size();
|
||||
long bqTotalSize = -1;
|
||||
for (int i = 0; i < stripeCount; ++i) {
|
||||
// If we want to compact L0 to drop deletes, we only want whole-stripe compactions.
|
||||
// So, pass includeL0 as 2nd parameter to indicate that.
|
||||
List<StoreFile> selection = selectSimpleCompaction(stripes.get(i),
|
||||
!canDropDeletesWithoutL0 && includeL0, isOffpeak);
|
||||
if (selection.isEmpty()) continue;
|
||||
long size = 0;
|
||||
for (StoreFile sf : selection) {
|
||||
size += sf.getReader().length();
|
||||
}
|
||||
if (bqSelection == null || selection.size() > bqSelection.size() ||
|
||||
(selection.size() == bqSelection.size() && size < bqTotalSize)) {
|
||||
bqSelection = selection;
|
||||
bqIndex = i;
|
||||
bqTotalSize = size;
|
||||
}
|
||||
}
|
||||
if (bqSelection == null) {
|
||||
LOG.debug("No good compaction is possible in any stripe");
|
||||
return null;
|
||||
}
|
||||
List<StoreFile> filesToCompact = new ArrayList<StoreFile>(bqSelection);
|
||||
// See if we can, and need to, split this stripe.
|
||||
int targetCount = 1;
|
||||
long targetKvs = Long.MAX_VALUE;
|
||||
boolean hasAllFiles = filesToCompact.size() == stripes.get(bqIndex).size();
|
||||
String splitString = "";
|
||||
if (hasAllFiles && bqTotalSize >= config.getSplitSize()) {
|
||||
if (includeL0) {
|
||||
// We want to avoid the scenario where we compact a stripe w/L0 and then split it.
|
||||
// So, if we might split, don't compact the stripe with L0.
|
||||
return null;
|
||||
}
|
||||
Pair<Long, Integer> kvsAndCount = estimateTargetKvs(filesToCompact, config.getSplitCount());
|
||||
targetKvs = kvsAndCount.getFirst();
|
||||
targetCount = kvsAndCount.getSecond();
|
||||
splitString = "; the stripe will be split into at most "
|
||||
+ targetCount + " stripes with " + targetKvs + " target KVs";
|
||||
}
|
||||
|
||||
LOG.debug("Found compaction in a stripe with end key ["
|
||||
+ Bytes.toString(si.getEndRow(bqIndex)) + "], with "
|
||||
+ filesToCompact.size() + " files of total size " + bqTotalSize + splitString);
|
||||
|
||||
// See if we can drop deletes.
|
||||
StripeCompactionRequest req;
|
||||
if (includeL0) {
|
||||
assert hasAllFiles;
|
||||
List<StoreFile> l0Files = si.getLevel0Files();
|
||||
LOG.debug("Adding " + l0Files.size() + " files to compaction to be able to drop deletes");
|
||||
ConcatenatedLists<StoreFile> sfs = new ConcatenatedLists<StoreFile>();
|
||||
sfs.addSublist(filesToCompact);
|
||||
sfs.addSublist(l0Files);
|
||||
req = new BoundaryStripeCompactionRequest(sfs, si.getStripeBoundaries());
|
||||
} else {
|
||||
req = new SplitStripeCompactionRequest(
|
||||
filesToCompact, si.getStartRow(bqIndex), si.getEndRow(bqIndex), targetCount, targetKvs);
|
||||
}
|
||||
if (canDropDeletesWithoutL0 || includeL0) {
|
||||
req.setMajorRange(si.getStartRow(bqIndex), si.getEndRow(bqIndex));
|
||||
}
|
||||
req.getRequest().setOffPeak(isOffpeak);
|
||||
return req;
|
||||
}
|
||||
|
||||
/**
|
||||
* Selects the compaction of a single stripe using default policy.
|
||||
* @param sfs Files.
|
||||
* @param allFilesOnly Whether a compaction of all-or-none files is needed.
|
||||
* @return The resulting selection.
|
||||
*/
|
||||
private List<StoreFile> selectSimpleCompaction(
|
||||
List<StoreFile> sfs, boolean allFilesOnly, boolean isOffpeak) {
|
||||
int minFilesLocal = Math.max(
|
||||
allFilesOnly ? sfs.size() : 0, this.config.getStripeCompactMinFiles());
|
||||
int maxFilesLocal = Math.max(this.config.getStripeCompactMaxFiles(), minFilesLocal);
|
||||
return stripePolicy.applyCompactionPolicy(sfs, isOffpeak, false, minFilesLocal, maxFilesLocal);
|
||||
}
|
||||
|
||||
/**
|
||||
* Selects the compaction that compacts all files (to be removed later).
|
||||
* @param si StoreFileManager.
|
||||
* @param targetStripeCount Target stripe count.
|
||||
* @param targetSize Target stripe size.
|
||||
* @return The compaction.
|
||||
*/
|
||||
private StripeCompactionRequest selectCompactionOfAllFiles(StripeInformationProvider si,
|
||||
int targetStripeCount, long targetSize) {
|
||||
Collection<StoreFile> allFiles = si.getStorefiles();
|
||||
SplitStripeCompactionRequest request = new SplitStripeCompactionRequest(
|
||||
allFiles, OPEN_KEY, OPEN_KEY, targetStripeCount, targetSize);
|
||||
request.setMajorRangeFull();
|
||||
LOG.debug("Selecting a compaction that includes all " + allFiles.size() + " files");
|
||||
return request;
|
||||
}
|
||||
|
||||
private StripeCompactionRequest selectNewStripesCompaction(StripeInformationProvider si) {
|
||||
List<StoreFile> l0Files = si.getLevel0Files();
|
||||
Pair<Long, Integer> kvsAndCount = estimateTargetKvs(l0Files, config.getInitialCount());
|
||||
LOG.debug("Creating " + kvsAndCount.getSecond() + " initial stripes with "
|
||||
+ kvsAndCount.getFirst() + " kvs each via L0 compaction of " + l0Files.size() + " files");
|
||||
SplitStripeCompactionRequest request = new SplitStripeCompactionRequest(
|
||||
si.getLevel0Files(), OPEN_KEY, OPEN_KEY, kvsAndCount.getSecond(), kvsAndCount.getFirst());
|
||||
request.setMajorRangeFull(); // L0 only, can drop deletes.
|
||||
return request;
|
||||
}
|
||||
|
||||
private StripeCompactionRequest selectExpiredMergeCompaction(
|
||||
StripeInformationProvider si, boolean canDropDeletesNoL0) {
|
||||
long cfTtl = this.storeConfigInfo.getStoreFileTtl();
|
||||
if (cfTtl == Long.MAX_VALUE) {
|
||||
return null; // minversion might be set, cannot delete old files
|
||||
}
|
||||
long timestampCutoff = EnvironmentEdgeManager.currentTimeMillis() - cfTtl;
|
||||
// Merge the longest sequence of stripes where all files have expired, if any.
|
||||
int start = -1, bestStart = -1, length = 0, bestLength = 0;
|
||||
ArrayList<ImmutableList<StoreFile>> stripes = si.getStripes();
|
||||
OUTER: for (int i = 0; i < stripes.size(); ++i) {
|
||||
for (StoreFile storeFile : stripes.get(i)) {
|
||||
if (storeFile.getReader().getMaxTimestamp() < timestampCutoff) continue;
|
||||
// Found non-expired file, this stripe has to stay.
|
||||
if (length > bestLength) {
|
||||
bestStart = start;
|
||||
bestLength = length;
|
||||
}
|
||||
start = -1;
|
||||
length = 0;
|
||||
continue OUTER;
|
||||
}
|
||||
if (start == -1) {
|
||||
start = i;
|
||||
}
|
||||
++length;
|
||||
}
|
||||
if (length > bestLength) {
|
||||
bestStart = start;
|
||||
bestLength = length;
|
||||
}
|
||||
if (bestLength == 0) return null;
|
||||
if (bestLength == 1) {
|
||||
// This is currently inefficient. If only one stripe expired, we will rewrite some
|
||||
// entire stripe just to delete some expired files because we rely on metadata and it
|
||||
// cannot simply be updated in an old file. When we either determine stripe dynamically
|
||||
// or move metadata to manifest, we can just drop the "expired stripes".
|
||||
if (bestStart == (stripes.size() - 1)) return null;
|
||||
++bestLength;
|
||||
}
|
||||
LOG.debug("Merging " + bestLength + " stripes to delete expired store files");
|
||||
int endIndex = bestStart + bestLength - 1;
|
||||
ConcatenatedLists<StoreFile> sfs = new ConcatenatedLists<StoreFile>();
|
||||
sfs.addAllSublists(stripes.subList(bestStart, endIndex + 1));
|
||||
SplitStripeCompactionRequest result = new SplitStripeCompactionRequest(sfs,
|
||||
si.getStartRow(bestStart), si.getEndRow(endIndex), 1, Long.MAX_VALUE);
|
||||
if (canDropDeletesNoL0) {
|
||||
result.setMajorRangeFull();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private static long getTotalKvCount(final Collection<StoreFile> candidates) {
|
||||
long totalSize = 0;
|
||||
for (StoreFile storeFile : candidates) {
|
||||
totalSize += storeFile.getReader().getEntries();
|
||||
}
|
||||
return totalSize;
|
||||
}
|
||||
|
||||
private static long getTotalFileSize(final Collection<StoreFile> candidates) {
|
||||
long totalSize = 0;
|
||||
for (StoreFile storeFile : candidates) {
|
||||
totalSize += storeFile.getReader().length();
|
||||
}
|
||||
return totalSize;
|
||||
}
|
||||
|
||||
private Pair<Long, Integer> estimateTargetKvs(Collection<StoreFile> files, double splitCount) {
|
||||
// If the size is larger than what we target, we don't want to split into proportionally
|
||||
// larger parts and then have to split again very soon. So, we will increase the multiplier
|
||||
// by one until we get small enough parts. E.g. 5Gb stripe that should have been split into
|
||||
// 2 parts when it was 3Gb will be split into 3x1.67Gb parts, rather than 2x2.5Gb parts.
|
||||
long totalSize = getTotalFileSize(files);
|
||||
long targetPartSize = config.getSplitPartSize();
|
||||
assert targetPartSize > 0 && splitCount > 0;
|
||||
double ratio = totalSize / (splitCount * targetPartSize); // ratio of real to desired size
|
||||
while (ratio > 1.0) {
|
||||
// Ratio of real to desired size if we increase the multiplier.
|
||||
double newRatio = totalSize / ((splitCount + 1.0) * targetPartSize);
|
||||
if ((1.0 / newRatio) >= ratio) break; // New ratio is < 1.0, but further than the last one.
|
||||
ratio = newRatio;
|
||||
splitCount += 1.0;
|
||||
}
|
||||
long kvCount = (long)(getTotalKvCount(files) / splitCount);
|
||||
return new Pair<Long, Integer>(kvCount, (int)Math.ceil(splitCount));
|
||||
}
|
||||
|
||||
/** Stripe compaction request wrapper. */
|
||||
public abstract static class StripeCompactionRequest {
|
||||
protected CompactionRequest request;
|
||||
protected byte[] majorRangeFromRow = null, majorRangeToRow = null;
|
||||
|
||||
/**
|
||||
* Executes the request against compactor (essentially, just calls correct overload of
|
||||
* compact method), to simulate more dynamic dispatch.
|
||||
* @param compactor Compactor.
|
||||
* @return result of compact(...)
|
||||
*/
|
||||
public abstract List<Path> execute(StripeCompactor compactor);
|
||||
|
||||
public StripeCompactionRequest(CompactionRequest request) {
|
||||
this.request = request;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets compaction "major range". Major range is the key range for which all
|
||||
* the files are included, so they can be treated like major-compacted files.
|
||||
* @param startRow Left boundary, inclusive.
|
||||
* @param endRow Right boundary, exclusive.
|
||||
*/
|
||||
public void setMajorRange(byte[] startRow, byte[] endRow) {
|
||||
this.majorRangeFromRow = startRow;
|
||||
this.majorRangeToRow = endRow;
|
||||
}
|
||||
|
||||
public CompactionRequest getRequest() {
|
||||
return this.request;
|
||||
}
|
||||
|
||||
public void setRequest(CompactionRequest request) {
|
||||
assert request != null;
|
||||
this.request = request;
|
||||
this.majorRangeFromRow = this.majorRangeToRow = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Request for stripe compactor that will cause it to split the source files into several
|
||||
* separate files at the provided boundaries.
|
||||
*/
|
||||
private static class BoundaryStripeCompactionRequest extends StripeCompactionRequest {
|
||||
private final List<byte[]> targetBoundaries;
|
||||
|
||||
/**
|
||||
* @param request Original request.
|
||||
* @param targetBoundaries New files should be written with these boundaries.
|
||||
*/
|
||||
public BoundaryStripeCompactionRequest(CompactionRequest request,
|
||||
List<byte[]> targetBoundaries) {
|
||||
super(request);
|
||||
this.targetBoundaries = targetBoundaries;
|
||||
}
|
||||
|
||||
public BoundaryStripeCompactionRequest(Collection<StoreFile> files,
|
||||
List<byte[]> targetBoundaries) {
|
||||
this(new CompactionRequest(files), targetBoundaries);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Path> execute(StripeCompactor compactor) {
|
||||
return compactor.compact(
|
||||
this.request, this.targetBoundaries, this.majorRangeFromRow, this.majorRangeToRow);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Request for stripe compactor that will cause it to split the source files into several
|
||||
* separate files into based on key-value count, as well as file count limit.
|
||||
* Most of the files will be roughly the same size. The last file may be smaller or larger
|
||||
* depending on the interplay of the amount of data and maximum number of files allowed.
|
||||
*/
|
||||
private static class SplitStripeCompactionRequest extends StripeCompactionRequest {
|
||||
private final byte[] startRow, endRow;
|
||||
private final int targetCount;
|
||||
private final long targetKvs;
|
||||
|
||||
/**
|
||||
* @param request Original request.
|
||||
* @param startRow Left boundary of the range to compact, inclusive.
|
||||
* @param endRow Right boundary of the range to compact, exclusive.
|
||||
* @param targetCount The maximum number of stripe to compact into.
|
||||
* @param targetKvs The KV count of each segment. If targetKvs*targetCount is less than
|
||||
* total number of kvs, all the overflow data goes into the last stripe.
|
||||
*/
|
||||
public SplitStripeCompactionRequest(CompactionRequest request,
|
||||
byte[] startRow, byte[] endRow, int targetCount, long targetKvs) {
|
||||
super(request);
|
||||
this.startRow = startRow;
|
||||
this.endRow = endRow;
|
||||
this.targetCount = targetCount;
|
||||
this.targetKvs = targetKvs;
|
||||
}
|
||||
|
||||
public SplitStripeCompactionRequest(
|
||||
CompactionRequest request, byte[] startRow, byte[] endRow, long targetKvs) {
|
||||
this(request, startRow, endRow, Integer.MAX_VALUE, targetKvs);
|
||||
}
|
||||
|
||||
public SplitStripeCompactionRequest(
|
||||
Collection<StoreFile> files, byte[] startRow, byte[] endRow, long targetKvs) {
|
||||
this(files, startRow, endRow, Integer.MAX_VALUE, targetKvs);
|
||||
}
|
||||
|
||||
public SplitStripeCompactionRequest(Collection<StoreFile> files,
|
||||
byte[] startRow, byte[] endRow, int targetCount, long targetKvs) {
|
||||
this(new CompactionRequest(files), startRow, endRow, targetCount, targetKvs);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Path> execute(StripeCompactor compactor) {
|
||||
return compactor.compact(this.request, this.targetCount, this.targetKvs,
|
||||
this.startRow, this.endRow, this.majorRangeFromRow, this.majorRangeToRow);
|
||||
}
|
||||
|
||||
/** Set major range of the compaction to the entire compaction range.
|
||||
* See {@link #setMajorRange(byte[], byte[])}. */
|
||||
public void setMajorRangeFull() {
|
||||
setMajorRange(this.startRow, this.endRow);
|
||||
}
|
||||
}
|
||||
|
||||
/** Helper class used to calculate size related things */
|
||||
private static class StripeSizes {
|
||||
public final ArrayList<Long> kvCounts;
|
||||
public final ArrayList<Long> fileSizes;
|
||||
public double avgKvCount = 0;
|
||||
public long minKvCount = Long.MAX_VALUE, maxKvCount = Long.MIN_VALUE;
|
||||
public int minIndex = -1, maxIndex = -1;
|
||||
|
||||
public StripeSizes(List<ImmutableList<StoreFile>> stripes) {
|
||||
assert !stripes.isEmpty();
|
||||
kvCounts = new ArrayList<Long>(stripes.size());
|
||||
fileSizes = new ArrayList<Long>(stripes.size());
|
||||
for (int i = 0; i < stripes.size(); ++i) {
|
||||
long kvCount = getTotalKvCount(stripes.get(i));
|
||||
fileSizes.add(getTotalFileSize(stripes.get(i)));
|
||||
kvCounts.add(kvCount);
|
||||
avgKvCount += (double)(kvCount - avgKvCount) / (i + 1);
|
||||
if (minKvCount > kvCount) {
|
||||
minIndex = i;
|
||||
minKvCount = kvCount;
|
||||
}
|
||||
if (maxKvCount < kvCount) {
|
||||
maxIndex = i;
|
||||
maxKvCount = kvCount;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** The information about stripes that the policy needs to do its stuff */
|
||||
public static interface StripeInformationProvider {
|
||||
public Collection<StoreFile> getStorefiles();
|
||||
|
||||
/**
|
||||
* Gets the start row for a given stripe.
|
||||
* @param stripeIndex Stripe index.
|
||||
* @return Start row. May be an open key.
|
||||
*/
|
||||
public byte[] getStartRow(int stripeIndex);
|
||||
|
||||
/**
|
||||
* Gets the end row for a given stripe.
|
||||
* @param stripeIndex Stripe index.
|
||||
* @return End row. May be an open key.
|
||||
*/
|
||||
public byte[] getEndRow(int stripeIndex);
|
||||
|
||||
/**
|
||||
* @return Level 0 files.
|
||||
*/
|
||||
public List<StoreFile> getLevel0Files();
|
||||
|
||||
/**
|
||||
* @return All stripe boundaries; including the open ones on both ends.
|
||||
*/
|
||||
public List<byte[]> getStripeBoundaries();
|
||||
|
||||
/**
|
||||
* @return The stripes.
|
||||
*/
|
||||
public ArrayList<ImmutableList<StoreFile>> getStripes();
|
||||
|
||||
/**
|
||||
* @return Stripe count.
|
||||
*/
|
||||
public int getStripeCount();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,46 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver.compactions;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.lang.NotImplementedException;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.regionserver.Store;
|
||||
|
||||
/**
|
||||
* This is the placeholder for stripe compactor. The implementation,
|
||||
* as well as the proper javadoc, will be added in HBASE-7967.
|
||||
*/
|
||||
public class StripeCompactor extends Compactor {
|
||||
|
||||
public StripeCompactor(Configuration conf, final Store store) {
|
||||
super(conf, store);
|
||||
}
|
||||
|
||||
public List<Path> compact(CompactionRequest request, List<byte[]> targetBoundaries,
|
||||
byte[] dropDeletesFromRow, byte[] dropDeletesToRow) {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
public List<Path> compact(CompactionRequest request, int targetCount, long targetSize,
|
||||
byte[] left, byte[] right, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
}
|
|
@ -73,6 +73,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
|||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -644,7 +645,7 @@ public class TestCompaction {
|
|||
HStore store = (HStore) r.getStore(COLUMN_FAMILY);
|
||||
|
||||
Collection<StoreFile> storeFiles = store.getStorefiles();
|
||||
Compactor tool = store.storeEngine.getCompactor();
|
||||
DefaultCompactor tool = (DefaultCompactor)store.storeEngine.getCompactor();
|
||||
|
||||
List<Path> newFiles = tool.compactForTesting(storeFiles, false);
|
||||
|
||||
|
|
|
@ -0,0 +1,108 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.KeyValue.KVComparator;
|
||||
import org.apache.hadoop.hbase.SmallTests;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category(SmallTests.class)
|
||||
public class TestStripeStoreEngine {
|
||||
|
||||
@Test
|
||||
public void testCreateBasedOnConfig() throws Exception {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, TestStoreEngine.class.getName());
|
||||
StripeStoreEngine se = createEngine(conf);
|
||||
assertTrue(se.getCompactionPolicy() instanceof StripeCompactionPolicy);
|
||||
}
|
||||
|
||||
public static class TestStoreEngine extends StripeStoreEngine {
|
||||
public void setCompactorOverride(StripeCompactor compactorOverride) {
|
||||
this.compactor = compactorOverride;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCompactionContextForceSelect() throws Exception {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
int targetCount = 2;
|
||||
conf.setInt(StripeStoreConfig.INITIAL_STRIPE_COUNT_KEY, targetCount);
|
||||
conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 2);
|
||||
conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, TestStoreEngine.class.getName());
|
||||
TestStoreEngine se = createEngine(conf);
|
||||
StripeCompactor mockCompactor = mock(StripeCompactor.class);
|
||||
se.setCompactorOverride(mockCompactor);
|
||||
when(mockCompactor.compact(any(CompactionRequest.class), anyInt(), anyLong(),
|
||||
any(byte[].class), any(byte[].class), any(byte[].class), any(byte[].class)))
|
||||
.thenReturn(new ArrayList<Path>());
|
||||
|
||||
// Produce 3 L0 files.
|
||||
StoreFile sf = createFile();
|
||||
ArrayList<StoreFile> compactUs = al(sf, createFile(), createFile());
|
||||
se.getStoreFileManager().loadFiles(compactUs);
|
||||
// Create a compaction that would want to split the stripe.
|
||||
CompactionContext compaction = se.createCompaction();
|
||||
compaction.select(al(), false, false, false);
|
||||
assertEquals(3, compaction.getRequest().getFiles().size());
|
||||
// Override the file list. Granted, overriding this compaction in this manner will
|
||||
// break things in real world, but we only want to verify the override.
|
||||
compactUs.remove(sf);
|
||||
CompactionRequest req = new CompactionRequest(compactUs);
|
||||
compaction.forceSelect(req);
|
||||
assertEquals(2, compaction.getRequest().getFiles().size());
|
||||
assertFalse(compaction.getRequest().getFiles().contains(sf));
|
||||
// Make sure the correct method it called on compactor.
|
||||
compaction.compact();
|
||||
verify(mockCompactor, times(1)).compact(compaction.getRequest(), targetCount, 0L,
|
||||
StripeStoreFileManager.OPEN_KEY, StripeStoreFileManager.OPEN_KEY, null, null);
|
||||
}
|
||||
|
||||
private static StoreFile createFile() throws Exception {
|
||||
StoreFile sf = mock(StoreFile.class);
|
||||
when(sf.getMetadataValue(any(byte[].class)))
|
||||
.thenReturn(StripeStoreFileManager.INVALID_KEY);
|
||||
when(sf.getReader()).thenReturn(mock(StoreFile.Reader.class));
|
||||
when(sf.getPath()).thenReturn(new Path("moo"));
|
||||
return sf;
|
||||
}
|
||||
|
||||
private static TestStoreEngine createEngine(Configuration conf) throws Exception {
|
||||
Store store = mock(Store.class);
|
||||
KVComparator kvComparator = mock(KVComparator.class);
|
||||
return (TestStoreEngine)StoreEngine.create(store, conf, kvComparator);
|
||||
}
|
||||
|
||||
private static ArrayList<StoreFile> al(StoreFile... sfs) {
|
||||
return new ArrayList<StoreFile>(Arrays.asList(sfs));
|
||||
}
|
||||
}
|
|
@ -43,6 +43,7 @@ import org.junit.After;
|
|||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
|
||||
@Category(SmallTests.class)
|
||||
|
@ -237,7 +238,7 @@ public class TestStripeStoreFileManager {
|
|||
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
if (splitRatioToVerify != 0) {
|
||||
conf.setFloat(StripeStoreConfig.MAX_SPLIT_IMBALANCE, splitRatioToVerify);
|
||||
conf.setFloat(StripeStoreConfig.MAX_REGION_SPLIT_IMBALANCE_KEY, splitRatioToVerify);
|
||||
}
|
||||
StripeStoreFileManager manager = createManager(al(), conf);
|
||||
manager.addCompactionResults(al(), sfs);
|
||||
|
@ -536,6 +537,7 @@ public class TestStripeStoreFileManager {
|
|||
verifyGetOrScanScenario(manager, false, null, null, results);
|
||||
}
|
||||
|
||||
// TODO: replace with Mockito?
|
||||
private static MockStoreFile createFile(
|
||||
long size, long seqNum, byte[] startKey, byte[] endKey) throws Exception {
|
||||
FileSystem fs = TEST_UTIL.getTestFileSystem();
|
||||
|
@ -573,7 +575,9 @@ public class TestStripeStoreFileManager {
|
|||
|
||||
private static StripeStoreFileManager createManager(
|
||||
ArrayList<StoreFile> sfs, Configuration conf) throws Exception {
|
||||
StripeStoreFileManager result = new StripeStoreFileManager(new KVComparator(), conf);
|
||||
StripeStoreConfig config = new StripeStoreConfig(
|
||||
conf, Mockito.mock(StoreConfigInformation.class));
|
||||
StripeStoreFileManager result = new StripeStoreFileManager(new KVComparator(), conf, config);
|
||||
result.loadFiles(sfs);
|
||||
return result;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,583 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver.compactions;
|
||||
|
||||
import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
|
||||
import static org.junit.Assert.*;
|
||||
import static org.mockito.AdditionalMatchers.aryEq;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyInt;
|
||||
import static org.mockito.Matchers.anyLong;
|
||||
import static org.mockito.Matchers.argThat;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Matchers.isNull;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.lang.NotImplementedException;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.SmallTests;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||
import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
|
||||
import org.apache.hadoop.hbase.regionserver.StripeStoreFileManager;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy.StripeInformationProvider;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ConcatenatedLists;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.mockito.ArgumentMatcher;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
@Category(SmallTests.class)
|
||||
public class TestStripeCompactionPolicy {
|
||||
private static final byte[] KEY_A = Bytes.toBytes("aaa");
|
||||
private static final byte[] KEY_B = Bytes.toBytes("bbb");
|
||||
private static final byte[] KEY_C = Bytes.toBytes("ccc");
|
||||
private static final byte[] KEY_D = Bytes.toBytes("ddd");
|
||||
private static final byte[] KEY_E = Bytes.toBytes("eee");
|
||||
|
||||
private static long defaultSplitSize = 18;
|
||||
private static float defaultSplitCount = 1.8F;
|
||||
private final static int defaultInitialCount = 1;
|
||||
private static long defaultTtl = 1000 * 1000;
|
||||
|
||||
@Test
|
||||
public void testSingleStripeCompaction() throws Exception {
|
||||
// Create a special policy that only compacts single stripes, using standard methods.
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
conf.setFloat(CompactionConfiguration.RATIO_KEY, 1.0F);
|
||||
conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 3);
|
||||
conf.setInt(StripeStoreConfig.MAX_FILES_KEY, 4);
|
||||
conf.setLong(StripeStoreConfig.SIZE_TO_SPLIT_KEY, 1000); // make sure the are no splits
|
||||
StoreConfigInformation sci = mock(StoreConfigInformation.class);
|
||||
StripeStoreConfig ssc = new StripeStoreConfig(conf, sci);
|
||||
StripeCompactionPolicy policy = new StripeCompactionPolicy(conf, sci, ssc) {
|
||||
@Override
|
||||
public StripeCompactionRequest selectCompaction(StripeInformationProvider si,
|
||||
List<StoreFile> filesCompacting, boolean isOffpeak) throws IOException {
|
||||
if (!filesCompacting.isEmpty()) return null;
|
||||
return selectSingleStripeCompaction(si, false, false, isOffpeak);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean needsCompactions(
|
||||
StripeInformationProvider si, List<StoreFile> filesCompacting) {
|
||||
if (!filesCompacting.isEmpty()) return false;
|
||||
return needsSingleStripeCompaction(si);
|
||||
}
|
||||
};
|
||||
|
||||
// No compaction due to min files or ratio
|
||||
StripeInformationProvider si = createStripesWithSizes(0, 0,
|
||||
new Long[] { 2L }, new Long[] { 3L, 3L }, new Long[] { 5L, 1L });
|
||||
verifyNoCompaction(policy, si);
|
||||
// No compaction due to min files or ratio - will report needed, but not do any.
|
||||
si = createStripesWithSizes(0, 0,
|
||||
new Long[] { 2L }, new Long[] { 3L, 3L }, new Long[] { 5L, 1L, 1L });
|
||||
assertNull(policy.selectCompaction(si, al(), false));
|
||||
assertTrue(policy.needsCompactions(si, al()));
|
||||
// One stripe has possible compaction
|
||||
si = createStripesWithSizes(0, 0,
|
||||
new Long[] { 2L }, new Long[] { 3L, 3L }, new Long[] { 5L, 4L, 3L });
|
||||
verifySingleStripeCompaction(policy, si, 2, null);
|
||||
// Several stripes have possible compactions; choose best quality (removes most files)
|
||||
si = createStripesWithSizes(0, 0,
|
||||
new Long[] { 3L, 2L, 2L }, new Long[] { 2L, 2L, 1L }, new Long[] { 3L, 2L, 2L, 1L });
|
||||
verifySingleStripeCompaction(policy, si, 2, null);
|
||||
si = createStripesWithSizes(0, 0,
|
||||
new Long[] { 5L }, new Long[] { 3L, 2L, 2L, 1L }, new Long[] { 3L, 2L, 2L });
|
||||
verifySingleStripeCompaction(policy, si, 1, null);
|
||||
// Or with smallest files, if the count is the same
|
||||
si = createStripesWithSizes(0, 0,
|
||||
new Long[] { 3L, 3L, 3L }, new Long[] { 3L, 1L, 2L }, new Long[] { 3L, 2L, 2L });
|
||||
verifySingleStripeCompaction(policy, si, 1, null);
|
||||
// Verify max count is respected.
|
||||
si = createStripesWithSizes(0, 0, new Long[] { 5L }, new Long[] { 5L, 4L, 4L, 4L, 4L });
|
||||
List<StoreFile> sfs = si.getStripes().get(1).subList(1, 5);
|
||||
verifyCompaction(policy, si, sfs, null, 1, null, si.getStartRow(1), si.getEndRow(1), true);
|
||||
// Verify ratio is applied.
|
||||
si = createStripesWithSizes(0, 0, new Long[] { 5L }, new Long[] { 50L, 4L, 4L, 4L, 4L });
|
||||
sfs = si.getStripes().get(1).subList(1, 5);
|
||||
verifyCompaction(policy, si, sfs, null, 1, null, si.getStartRow(1), si.getEndRow(1), true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithParallelCompaction() throws Exception {
|
||||
// TODO: currently only one compaction at a time per store is allowed. If this changes,
|
||||
// the appropriate file exclusion testing would need to be done in respective tests.
|
||||
assertNull(createPolicy(HBaseConfiguration.create()).selectCompaction(
|
||||
mock(StripeInformationProvider.class), al(createFile()), false));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithReferences() throws Exception {
|
||||
StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create());
|
||||
StripeCompactor sc = mock(StripeCompactor.class);
|
||||
StoreFile ref = createFile();
|
||||
when(ref.isReference()).thenReturn(true);
|
||||
StripeInformationProvider si = mock(StripeInformationProvider.class);
|
||||
Collection<StoreFile> sfs = al(ref, createFile());
|
||||
when(si.getStorefiles()).thenReturn(sfs);
|
||||
|
||||
assertTrue(policy.needsCompactions(si, al()));
|
||||
StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
|
||||
assertEquals(si.getStorefiles(), scr.getRequest().getFiles());
|
||||
scr.execute(sc);
|
||||
verify(sc, only()).compact(eq(scr.getRequest()), anyInt(), anyLong(),
|
||||
aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInitialCountFromL0() throws Exception {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 2);
|
||||
StripeCompactionPolicy policy = createPolicy(
|
||||
conf, defaultSplitSize, defaultSplitCount, 2, false);
|
||||
StripeCompactionPolicy.StripeInformationProvider si = createStripesL0Only(3, 8);
|
||||
verifyCompaction(policy, si, si.getStorefiles(), true, 2, 12L, OPEN_KEY, OPEN_KEY, true);
|
||||
si = createStripesL0Only(3, 10); // If result would be too large, split into smaller parts.
|
||||
verifyCompaction(policy, si, si.getStorefiles(), true, 3, 10L, OPEN_KEY, OPEN_KEY, true);
|
||||
policy = createPolicy(conf, defaultSplitSize, defaultSplitCount, 6, false);
|
||||
verifyCompaction(policy, si, si.getStorefiles(), true, 6, 5L, OPEN_KEY, OPEN_KEY, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExistingStripesFromL0() throws Exception {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 3);
|
||||
StripeCompactionPolicy.StripeInformationProvider si = createStripes(3, KEY_A);
|
||||
verifyCompaction(
|
||||
createPolicy(conf), si, si.getLevel0Files(), null, null, si.getStripeBoundaries());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNothingToCompactFromL0() throws Exception {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 4);
|
||||
StripeCompactionPolicy.StripeInformationProvider si = createStripesL0Only(3, 10);
|
||||
StripeCompactionPolicy policy = createPolicy(conf);
|
||||
verifyNoCompaction(policy, si);
|
||||
|
||||
si = createStripes(3, KEY_A);
|
||||
verifyNoCompaction(policy, si);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSplitOffStripe() throws Exception {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
// First test everything with default split count of 2, then split into more.
|
||||
conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 2);
|
||||
Long[] toSplit = new Long[] { defaultSplitSize - 2, 1L, 1L };
|
||||
Long[] noSplit = new Long[] { defaultSplitSize - 2, 1L };
|
||||
long splitTargetSize = (long)(defaultSplitSize / defaultSplitCount);
|
||||
// Don't split if not eligible for compaction.
|
||||
StripeCompactionPolicy.StripeInformationProvider si =
|
||||
createStripesWithSizes(0, 0, new Long[] { defaultSplitSize - 2, 2L });
|
||||
assertNull(createPolicy(conf).selectCompaction(si, al(), false));
|
||||
// Make sure everything is eligible.
|
||||
conf.setFloat(CompactionConfiguration.RATIO_KEY, 500f);
|
||||
StripeCompactionPolicy policy = createPolicy(conf);
|
||||
verifyWholeStripesCompaction(policy, si, 0, 0, null, 2, splitTargetSize);
|
||||
// Add some extra stripes...
|
||||
si = createStripesWithSizes(0, 0, noSplit, noSplit, toSplit);
|
||||
verifyWholeStripesCompaction(policy, si, 2, 2, null, 2, splitTargetSize);
|
||||
// In the middle.
|
||||
si = createStripesWithSizes(0, 0, noSplit, toSplit, noSplit);
|
||||
verifyWholeStripesCompaction(policy, si, 1, 1, null, 2, splitTargetSize);
|
||||
// No split-off with different config (larger split size).
|
||||
// However, in this case some eligible stripe will just be compacted alone.
|
||||
StripeCompactionPolicy specPolicy = createPolicy(
|
||||
conf, defaultSplitSize + 1, defaultSplitCount, defaultInitialCount, false);
|
||||
verifySingleStripeCompaction(specPolicy, si, 1, null);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSplitOffStripeDropDeletes() throws Exception {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 2);
|
||||
StripeCompactionPolicy policy = createPolicy(conf);
|
||||
Long[] toSplit = new Long[] { defaultSplitSize / 2, defaultSplitSize / 2 };
|
||||
Long[] noSplit = new Long[] { 1L };
|
||||
long splitTargetSize = (long)(defaultSplitSize / defaultSplitCount);
|
||||
|
||||
// Verify the deletes can be dropped if there are no L0 files.
|
||||
StripeCompactionPolicy.StripeInformationProvider si =
|
||||
createStripesWithSizes(0, 0, noSplit, toSplit);
|
||||
verifyWholeStripesCompaction(policy, si, 1, 1, true, null, splitTargetSize);
|
||||
// But cannot be dropped if there are.
|
||||
si = createStripesWithSizes(2, 2, noSplit, toSplit);
|
||||
verifyWholeStripesCompaction(policy, si, 1, 1, false, null, splitTargetSize);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
public void testMergeExpiredFiles() throws Exception {
|
||||
ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
|
||||
long now = defaultTtl + 2;
|
||||
edge.setValue(now);
|
||||
EnvironmentEdgeManager.injectEdge(edge);
|
||||
try {
|
||||
StoreFile expiredFile = createFile(), notExpiredFile = createFile();
|
||||
when(expiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl - 1);
|
||||
when(notExpiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl + 1);
|
||||
List<StoreFile> expired = Lists.newArrayList(expiredFile, expiredFile);
|
||||
List<StoreFile> notExpired = Lists.newArrayList(notExpiredFile, notExpiredFile);
|
||||
List<StoreFile> mixed = Lists.newArrayList(expiredFile, notExpiredFile);
|
||||
|
||||
StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create(),
|
||||
defaultSplitSize, defaultSplitCount, defaultInitialCount, true);
|
||||
// Merge expired if there are eligible stripes.
|
||||
StripeCompactionPolicy.StripeInformationProvider si =
|
||||
createStripesWithFiles(expired, expired, expired);
|
||||
verifyWholeStripesCompaction(policy, si, 0, 2, null, 1, Long.MAX_VALUE, false);
|
||||
// Don't merge if nothing expired.
|
||||
si = createStripesWithFiles(notExpired, notExpired, notExpired);
|
||||
assertNull(policy.selectCompaction(si, al(), false));
|
||||
// Merge one expired stripe with next.
|
||||
si = createStripesWithFiles(notExpired, expired, notExpired);
|
||||
verifyWholeStripesCompaction(policy, si, 1, 2, null, 1, Long.MAX_VALUE, false);
|
||||
// Merge the biggest run out of multiple options.
|
||||
// Merge one expired stripe with next.
|
||||
si = createStripesWithFiles(notExpired, expired, notExpired, expired, expired, notExpired);
|
||||
verifyWholeStripesCompaction(policy, si, 3, 4, null, 1, Long.MAX_VALUE, false);
|
||||
// Stripe with a subset of expired files is not merged.
|
||||
si = createStripesWithFiles(expired, expired, notExpired, expired, mixed);
|
||||
verifyWholeStripesCompaction(policy, si, 0, 1, null, 1, Long.MAX_VALUE, false);
|
||||
} finally {
|
||||
EnvironmentEdgeManager.reset();
|
||||
}
|
||||
}
|
||||
|
||||
private static StripeCompactionPolicy.StripeInformationProvider createStripesWithFiles(
|
||||
List<StoreFile>... stripeFiles) throws Exception {
|
||||
return createStripesWithFiles(createBoundaries(stripeFiles.length),
|
||||
Lists.newArrayList(stripeFiles), new ArrayList<StoreFile>());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSingleStripeDropDeletes() throws Exception {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
StripeCompactionPolicy policy = createPolicy(conf);
|
||||
// Verify the deletes can be dropped if there are no L0 files.
|
||||
Long[][] stripes = new Long[][] { new Long[] { 3L, 2L, 2L }, new Long[] { 6L } };
|
||||
StripeInformationProvider si = createStripesWithSizes(0, 0, stripes);
|
||||
verifySingleStripeCompaction(policy, si, 0, true);
|
||||
// But cannot be dropped if there are.
|
||||
si = createStripesWithSizes(2, 2, stripes);
|
||||
verifySingleStripeCompaction(policy, si, 0, false);
|
||||
// Unless there are enough to cause L0 compaction.
|
||||
si = createStripesWithSizes(6, 2, stripes);
|
||||
ConcatenatedLists<StoreFile> sfs = new ConcatenatedLists<StoreFile>();
|
||||
sfs.addSublist(si.getLevel0Files());
|
||||
sfs.addSublist(si.getStripes().get(0));
|
||||
verifyCompaction(
|
||||
policy, si, sfs, si.getStartRow(0), si.getEndRow(0), si.getStripeBoundaries());
|
||||
// If we cannot actually compact all files in some stripe, L0 is chosen.
|
||||
si = createStripesWithSizes(6, 2,
|
||||
new Long[][] { new Long[] { 10L, 1L, 1L, 1L, 1L }, new Long[] { 12L } });
|
||||
verifyCompaction(policy, si, si.getLevel0Files(), null, null, si.getStripeBoundaries());
|
||||
}
|
||||
|
||||
/********* HELPER METHODS ************/
|
||||
private static StripeCompactionPolicy createPolicy(
|
||||
Configuration conf) throws Exception {
|
||||
return createPolicy(conf, defaultSplitSize, defaultSplitCount, defaultInitialCount, false);
|
||||
}
|
||||
|
||||
private static StripeCompactionPolicy createPolicy(Configuration conf,
|
||||
long splitSize, float splitCount, int initialCount, boolean hasTtl) throws Exception {
|
||||
conf.setLong(StripeStoreConfig.SIZE_TO_SPLIT_KEY, splitSize);
|
||||
conf.setFloat(StripeStoreConfig.SPLIT_PARTS_KEY, splitCount);
|
||||
conf.setInt(StripeStoreConfig.INITIAL_STRIPE_COUNT_KEY, initialCount);
|
||||
StoreConfigInformation sci = mock(StoreConfigInformation.class);
|
||||
when(sci.getStoreFileTtl()).thenReturn(hasTtl ? defaultTtl : Long.MAX_VALUE);
|
||||
StripeStoreConfig ssc = new StripeStoreConfig(conf, sci);
|
||||
return new StripeCompactionPolicy(conf, sci, ssc);
|
||||
}
|
||||
|
||||
private static ArrayList<StoreFile> al(StoreFile... sfs) {
|
||||
return new ArrayList<StoreFile>(Arrays.asList(sfs));
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify the compaction that includes several entire stripes.
|
||||
* @param policy Policy to test.
|
||||
* @param si Stripe information pre-set with stripes to test.
|
||||
* @param from Starting stripe.
|
||||
* @param to Ending stripe (inclusive).
|
||||
* @param dropDeletes Whether to drop deletes from compaction range.
|
||||
* @param count Expected # of resulting stripes, null if not checked.
|
||||
* @param size Expected target stripe size, null if not checked.
|
||||
*/
|
||||
private void verifyWholeStripesCompaction(StripeCompactionPolicy policy,
|
||||
StripeInformationProvider si, int from, int to, Boolean dropDeletes,
|
||||
Integer count, Long size, boolean needsCompaction) throws IOException {
|
||||
verifyCompaction(policy, si, getAllFiles(si, from, to), dropDeletes,
|
||||
count, size, si.getStartRow(from), si.getEndRow(to), needsCompaction);
|
||||
}
|
||||
|
||||
private void verifyWholeStripesCompaction(StripeCompactionPolicy policy,
|
||||
StripeInformationProvider si, int from, int to, Boolean dropDeletes,
|
||||
Integer count, Long size) throws IOException {
|
||||
verifyWholeStripesCompaction(policy, si, from, to, dropDeletes, count, size, true);
|
||||
}
|
||||
|
||||
private void verifySingleStripeCompaction(StripeCompactionPolicy policy,
|
||||
StripeInformationProvider si, int index, Boolean dropDeletes) throws IOException {
|
||||
verifyWholeStripesCompaction(policy, si, index, index, dropDeletes, 1, null, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify no compaction is needed or selected.
|
||||
* @param policy Policy to test.
|
||||
* @param si Stripe information pre-set with stripes to test.
|
||||
*/
|
||||
private void verifyNoCompaction(
|
||||
StripeCompactionPolicy policy, StripeInformationProvider si) throws IOException {
|
||||
assertNull(policy.selectCompaction(si, al(), false));
|
||||
assertFalse(policy.needsCompactions(si, al()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify arbitrary compaction.
|
||||
* @param policy Policy to test.
|
||||
* @param si Stripe information pre-set with stripes to test.
|
||||
* @param sfs Files that should be compacted.
|
||||
* @param dropDeletesFrom Row from which to drop deletes.
|
||||
* @param dropDeletesTo Row to which to drop deletes.
|
||||
* @param boundaries Expected target stripe boundaries.
|
||||
*/
|
||||
private void verifyCompaction(StripeCompactionPolicy policy, StripeInformationProvider si,
|
||||
Collection<StoreFile> sfs, byte[] dropDeletesFrom, byte[] dropDeletesTo,
|
||||
final List<byte[]> boundaries) throws Exception {
|
||||
StripeCompactor sc = mock(StripeCompactor.class);
|
||||
assertTrue(policy.needsCompactions(si, al()));
|
||||
StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
|
||||
verifyCollectionsEqual(sfs, scr.getRequest().getFiles());
|
||||
scr.execute(sc);
|
||||
verify(sc, times(1)).compact(eq(scr.getRequest()), argThat(
|
||||
new ArgumentMatcher<List<byte[]>>() {
|
||||
@Override
|
||||
public boolean matches(Object argument) {
|
||||
@SuppressWarnings("unchecked")
|
||||
List<byte[]> other = (List<byte[]>)argument;
|
||||
if (other.size() != boundaries.size()) return false;
|
||||
for (int i = 0; i < other.size(); ++i) {
|
||||
if (!Bytes.equals(other.get(i), boundaries.get(i))) return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}),
|
||||
dropDeletesFrom == null ? isNull(byte[].class) : aryEq(dropDeletesFrom),
|
||||
dropDeletesTo == null ? isNull(byte[].class) : aryEq(dropDeletesTo));
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify arbitrary compaction.
|
||||
* @param policy Policy to test.
|
||||
* @param si Stripe information pre-set with stripes to test.
|
||||
* @param sfs Files that should be compacted.
|
||||
* @param dropDeletes Whether to drop deletes from compaction range.
|
||||
* @param count Expected # of resulting stripes, null if not checked.
|
||||
* @param size Expected target stripe size, null if not checked.
|
||||
* @param start Left boundary of the compaction.
|
||||
* @param righr Right boundary of the compaction.
|
||||
*/
|
||||
private void verifyCompaction(StripeCompactionPolicy policy, StripeInformationProvider si,
|
||||
Collection<StoreFile> sfs, Boolean dropDeletes, Integer count, Long size,
|
||||
byte[] start, byte[] end, boolean needsCompaction) throws IOException {
|
||||
StripeCompactor sc = mock(StripeCompactor.class);
|
||||
assertTrue(!needsCompaction || policy.needsCompactions(si, al()));
|
||||
StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
|
||||
verifyCollectionsEqual(sfs, scr.getRequest().getFiles());
|
||||
scr.execute(sc);
|
||||
verify(sc, times(1)).compact(eq(scr.getRequest()),
|
||||
count == null ? anyInt() : eq(count.intValue()),
|
||||
size == null ? anyLong() : eq(size.longValue()), aryEq(start), aryEq(end),
|
||||
dropDeletesMatcher(dropDeletes, start), dropDeletesMatcher(dropDeletes, end));
|
||||
}
|
||||
|
||||
private byte[] dropDeletesMatcher(Boolean dropDeletes, byte[] value) {
|
||||
return dropDeletes == null ? any(byte[].class)
|
||||
: (dropDeletes.booleanValue() ? aryEq(value) : isNull(byte[].class));
|
||||
}
|
||||
|
||||
private void verifyCollectionsEqual(Collection<StoreFile> sfs, Collection<StoreFile> scr) {
|
||||
// Dumb.
|
||||
assertEquals(sfs.size(), scr.size());
|
||||
assertTrue(scr.containsAll(sfs));
|
||||
}
|
||||
|
||||
private static List<StoreFile> getAllFiles(
|
||||
StripeInformationProvider si, int fromStripe, int toStripe) {
|
||||
ArrayList<StoreFile> expected = new ArrayList<StoreFile>();
|
||||
for (int i = fromStripe; i <= toStripe; ++i) {
|
||||
expected.addAll(si.getStripes().get(i));
|
||||
}
|
||||
return expected;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param l0Count Number of L0 files.
|
||||
* @param boundaries Target boundaries.
|
||||
* @return Mock stripes.
|
||||
*/
|
||||
private static StripeInformationProvider createStripes(
|
||||
int l0Count, byte[]... boundaries) throws Exception {
|
||||
List<Long> l0Sizes = new ArrayList<Long>();
|
||||
for (int i = 0; i < l0Count; ++i) {
|
||||
l0Sizes.add(5L);
|
||||
}
|
||||
List<List<Long>> sizes = new ArrayList<List<Long>>();
|
||||
for (int i = 0; i <= boundaries.length; ++i) {
|
||||
sizes.add(Arrays.asList(Long.valueOf(5)));
|
||||
}
|
||||
return createStripes(Arrays.asList(boundaries), sizes, l0Sizes);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param l0Count Number of L0 files.
|
||||
* @param l0Size Size of each file.
|
||||
* @return Mock stripes.
|
||||
*/
|
||||
private static StripeInformationProvider createStripesL0Only(
|
||||
int l0Count, long l0Size) throws Exception {
|
||||
List<Long> l0Sizes = new ArrayList<Long>();
|
||||
for (int i = 0; i < l0Count; ++i) {
|
||||
l0Sizes.add(l0Size);
|
||||
}
|
||||
return createStripes(null, new ArrayList<List<Long>>(), l0Sizes);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param l0Count Number of L0 files.
|
||||
* @param l0Size Size of each file.
|
||||
* @param sizes Sizes of the files; each sub-array representing a stripe.
|
||||
* @return Mock stripes.
|
||||
*/
|
||||
private static StripeInformationProvider createStripesWithSizes(
|
||||
int l0Count, long l0Size, Long[]... sizes) throws Exception {
|
||||
ArrayList<List<Long>> sizeList = new ArrayList<List<Long>>();
|
||||
for (Long[] size : sizes) {
|
||||
sizeList.add(Arrays.asList(size));
|
||||
}
|
||||
return createStripesWithSizes(l0Count, l0Size, sizeList);
|
||||
}
|
||||
|
||||
private static StripeInformationProvider createStripesWithSizes(
|
||||
int l0Count, long l0Size, List<List<Long>> sizes) throws Exception {
|
||||
List<byte[]> boundaries = createBoundaries(sizes.size());
|
||||
List<Long> l0Sizes = new ArrayList<Long>();
|
||||
for (int i = 0; i < l0Count; ++i) {
|
||||
l0Sizes.add(l0Size);
|
||||
}
|
||||
return createStripes(boundaries, sizes, l0Sizes);
|
||||
}
|
||||
|
||||
private static List<byte[]> createBoundaries(int stripeCount) {
|
||||
byte[][] keys = new byte[][] { KEY_A, KEY_B, KEY_C, KEY_D, KEY_E };
|
||||
assert stripeCount <= keys.length + 1;
|
||||
List<byte[]> boundaries = new ArrayList<byte[]>();
|
||||
for (int i = 0; i < stripeCount - 1; ++i) {
|
||||
boundaries.add(keys[i]);
|
||||
}
|
||||
return boundaries;
|
||||
}
|
||||
|
||||
private static StripeInformationProvider createStripes(List<byte[]> boundaries,
|
||||
List<List<Long>> stripeSizes, List<Long> l0Sizes) throws Exception {
|
||||
List<List<StoreFile>> stripeFiles = new ArrayList<List<StoreFile>>(stripeSizes.size());
|
||||
for (List<Long> sizes : stripeSizes) {
|
||||
List<StoreFile> sfs = new ArrayList<StoreFile>();
|
||||
for (Long size : sizes) {
|
||||
sfs.add(createFile(size));
|
||||
}
|
||||
stripeFiles.add(sfs);
|
||||
}
|
||||
List<StoreFile> l0Files = new ArrayList<StoreFile>();
|
||||
for (Long size : l0Sizes) {
|
||||
l0Files.add(createFile(size));
|
||||
}
|
||||
return createStripesWithFiles(boundaries, stripeFiles, l0Files);
|
||||
}
|
||||
|
||||
/**
|
||||
* This method actually does all the work.
|
||||
*/
|
||||
private static StripeInformationProvider createStripesWithFiles(List<byte[]> boundaries,
|
||||
List<List<StoreFile>> stripeFiles, List<StoreFile> l0Files) throws Exception {
|
||||
ArrayList<ImmutableList<StoreFile>> stripes = new ArrayList<ImmutableList<StoreFile>>();
|
||||
ArrayList<byte[]> boundariesList = new ArrayList<byte[]>();
|
||||
StripeInformationProvider si = mock(StripeInformationProvider.class);
|
||||
if (!stripeFiles.isEmpty()) {
|
||||
assert stripeFiles.size() == (boundaries.size() + 1);
|
||||
boundariesList.add(OPEN_KEY);
|
||||
for (int i = 0; i <= boundaries.size(); ++i) {
|
||||
byte[] startKey = ((i == 0) ? OPEN_KEY : boundaries.get(i - 1));
|
||||
byte[] endKey = ((i == boundaries.size()) ? OPEN_KEY : boundaries.get(i));
|
||||
boundariesList.add(endKey);
|
||||
for (StoreFile sf : stripeFiles.get(i)) {
|
||||
setFileStripe(sf, startKey, endKey);
|
||||
}
|
||||
stripes.add(ImmutableList.copyOf(stripeFiles.get(i)));
|
||||
when(si.getStartRow(eq(i))).thenReturn(startKey);
|
||||
when(si.getEndRow(eq(i))).thenReturn(endKey);
|
||||
}
|
||||
}
|
||||
ConcatenatedLists<StoreFile> sfs = new ConcatenatedLists<StoreFile>();
|
||||
sfs.addAllSublists(stripes);
|
||||
sfs.addSublist(l0Files);
|
||||
when(si.getStorefiles()).thenReturn(sfs);
|
||||
when(si.getStripes()).thenReturn(stripes);
|
||||
when(si.getStripeBoundaries()).thenReturn(boundariesList);
|
||||
when(si.getStripeCount()).thenReturn(stripes.size());
|
||||
when(si.getLevel0Files()).thenReturn(l0Files);
|
||||
return si;
|
||||
}
|
||||
|
||||
private static StoreFile createFile(long size) throws Exception {
|
||||
StoreFile sf = mock(StoreFile.class);
|
||||
when(sf.getPath()).thenReturn(new Path("moo"));
|
||||
StoreFile.Reader r = mock(StoreFile.Reader.class);
|
||||
when(r.getEntries()).thenReturn(size);
|
||||
when(r.length()).thenReturn(size);
|
||||
when(sf.getReader()).thenReturn(r);
|
||||
return sf;
|
||||
}
|
||||
|
||||
private static StoreFile createFile() throws Exception {
|
||||
return createFile(0);
|
||||
}
|
||||
|
||||
private static void setFileStripe(StoreFile sf, byte[] startKey, byte[] endKey) {
|
||||
when(sf.getMetadataValue(StripeStoreFileManager.STRIPE_START_KEY)).thenReturn(startKey);
|
||||
when(sf.getMetadataValue(StripeStoreFileManager.STRIPE_END_KEY)).thenReturn(endKey);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue