HBASE-15400 Use DateTieredCompactor for Date Tiered Compaction (Clara Xiong)
This commit is contained in:
parent
0727d0f414
commit
b17350210b
|
@ -0,0 +1,102 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.KeyValue.KVComparator;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionPolicy;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionRequest;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactor;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
|
||||||
|
import org.apache.hadoop.hbase.security.User;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* HBASE-15400 This store engine allows us to store data in date tiered layout with exponential
|
||||||
|
* sizing so that the more recent data has more granularity. Time-range scan will perform the
|
||||||
|
* best with most recent data. When data reach maxAge, they are compacted in fixed-size time
|
||||||
|
* windows for TTL and archiving. Please refer to design spec for more details.
|
||||||
|
* https://docs.google.com/document/d/1_AmlNb2N8Us1xICsTeGDLKIqL6T-oHoRLZ323MG_uy8/edit#heading=h.uk6y5pd3oqgx
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class DateTieredStoreEngine extends StoreEngine<DefaultStoreFlusher,
|
||||||
|
DateTieredCompactionPolicy, DateTieredCompactor, DefaultStoreFileManager> {
|
||||||
|
@Override
|
||||||
|
public boolean needsCompaction(List<StoreFile> filesCompacting) {
|
||||||
|
return compactionPolicy.needsCompaction(storeFileManager.getStorefiles(),
|
||||||
|
filesCompacting);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CompactionContext createCompaction() throws IOException {
|
||||||
|
return new DateTieredCompactionContext();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void createComponents(Configuration conf, Store store, KVComparator kvComparator)
|
||||||
|
throws IOException {
|
||||||
|
this.compactionPolicy = new DateTieredCompactionPolicy(conf, store);
|
||||||
|
this.storeFileManager = new DefaultStoreFileManager(kvComparator, conf,
|
||||||
|
compactionPolicy.getConf());
|
||||||
|
this.storeFlusher = new DefaultStoreFlusher(conf, store);
|
||||||
|
this.compactor = new DateTieredCompactor(conf, store);
|
||||||
|
}
|
||||||
|
|
||||||
|
private final class DateTieredCompactionContext extends CompactionContext {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<StoreFile> preSelect(List<StoreFile> filesCompacting) {
|
||||||
|
return compactionPolicy.preSelectCompactionForCoprocessor(storeFileManager.getStorefiles(),
|
||||||
|
filesCompacting);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean select(List<StoreFile> filesCompacting, boolean isUserCompaction,
|
||||||
|
boolean mayUseOffPeak, boolean forceMajor) throws IOException {
|
||||||
|
request = compactionPolicy.selectCompaction(storeFileManager.getStorefiles(), filesCompacting,
|
||||||
|
isUserCompaction, mayUseOffPeak, forceMajor);
|
||||||
|
return request != null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void forceSelect(CompactionRequest request) {
|
||||||
|
if (!(request instanceof DateTieredCompactionRequest)) {
|
||||||
|
throw new IllegalArgumentException("DateTieredCompactionRequest is expected. Actual: "
|
||||||
|
+ request.getClass().getCanonicalName());
|
||||||
|
}
|
||||||
|
super.forceSelect(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<Path> compact(ThroughputController throughputController, User user)
|
||||||
|
throws IOException {
|
||||||
|
if (request instanceof DateTieredCompactionRequest) {
|
||||||
|
return compactor.compact(request, ((DateTieredCompactionRequest) request).getBoundaries(),
|
||||||
|
throughputController, user);
|
||||||
|
} else {
|
||||||
|
throw new IllegalArgumentException("DateTieredCompactionRequest is expected. Actual: "
|
||||||
|
+ request.getClass().getCanonicalName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -1556,7 +1556,7 @@ public class HStore implements Store {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return storeEngine.getCompactionPolicy().isMajorCompaction(
|
return storeEngine.getCompactionPolicy().shouldPerformMajorCompaction(
|
||||||
this.storeEngine.getStoreFileManager().getStorefiles());
|
this.storeEngine.getStoreFileManager().getStorefiles());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,6 +18,12 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.google.common.base.Function;
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import com.google.common.collect.Ordering;
|
||||||
|
|
||||||
import java.io.DataInput;
|
import java.io.DataInput;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
@ -63,12 +69,6 @@ import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.Writables;
|
import org.apache.hadoop.hbase.util.Writables;
|
||||||
import org.apache.hadoop.io.WritableUtils;
|
import org.apache.hadoop.io.WritableUtils;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
import com.google.common.base.Function;
|
|
||||||
import com.google.common.base.Preconditions;
|
|
||||||
import com.google.common.collect.ImmutableList;
|
|
||||||
import com.google.common.collect.Ordering;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A Store data file. Stores usually have one or more of these files. They
|
* A Store data file. Stores usually have one or more of these files. They
|
||||||
* are produced by flushing the memstore to disk. To
|
* are produced by flushing the memstore to disk. To
|
||||||
|
@ -375,7 +375,7 @@ public class StoreFile {
|
||||||
* is turned off, fall back to BULKLOAD_TIME_KEY.
|
* is turned off, fall back to BULKLOAD_TIME_KEY.
|
||||||
* @return true if this storefile was created by bulk load.
|
* @return true if this storefile was created by bulk load.
|
||||||
*/
|
*/
|
||||||
boolean isBulkLoadResult() {
|
public boolean isBulkLoadResult() {
|
||||||
boolean bulkLoadedHFile = false;
|
boolean bulkLoadedHFile = false;
|
||||||
String fileName = this.getPath().getName();
|
String fileName = this.getPath().getName();
|
||||||
int startPos = fileName.indexOf("SeqId_");
|
int startPos = fileName.indexOf("SeqId_");
|
||||||
|
@ -1690,6 +1690,19 @@ public class StoreFile {
|
||||||
Ordering.natural().onResultOf(new GetPathName())
|
Ordering.natural().onResultOf(new GetPathName())
|
||||||
));
|
));
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Comparator for time-aware compaction. SeqId is still the first
|
||||||
|
* ordering criterion to maintain MVCC.
|
||||||
|
*/
|
||||||
|
public static final Comparator<StoreFile> SEQ_ID_MAX_TIMESTAMP =
|
||||||
|
Ordering.compound(ImmutableList.of(
|
||||||
|
Ordering.natural().onResultOf(new GetSeqId()),
|
||||||
|
Ordering.natural().onResultOf(new GetMaxTimestamp()),
|
||||||
|
Ordering.natural().onResultOf(new GetFileSize()).reverse(),
|
||||||
|
Ordering.natural().onResultOf(new GetBulkTime()),
|
||||||
|
Ordering.natural().onResultOf(new GetPathName())
|
||||||
|
));
|
||||||
|
|
||||||
private static class GetSeqId implements Function<StoreFile, Long> {
|
private static class GetSeqId implements Function<StoreFile, Long> {
|
||||||
@Override
|
@Override
|
||||||
public Long apply(StoreFile sf) {
|
public Long apply(StoreFile sf) {
|
||||||
|
@ -1724,5 +1737,12 @@ public class StoreFile {
|
||||||
return sf.getPath().getName();
|
return sf.getPath().getName();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class GetMaxTimestamp implements Function<StoreFile, Long> {
|
||||||
|
@Override
|
||||||
|
public Long apply(StoreFile sf) {
|
||||||
|
return sf.getMaximumTimestamp() == null? (Long)Long.MAX_VALUE : sf.getMaximumTimestamp();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -84,6 +84,8 @@ public class CompactionConfiguration {
|
||||||
"hbase.hstore.compaction.date.tiered.incoming.window.min";
|
"hbase.hstore.compaction.date.tiered.incoming.window.min";
|
||||||
public static final String COMPACTION_POLICY_CLASS_FOR_TIERED_WINDOWS_KEY =
|
public static final String COMPACTION_POLICY_CLASS_FOR_TIERED_WINDOWS_KEY =
|
||||||
"hbase.hstore.compaction.date.tiered.window.policy.class";
|
"hbase.hstore.compaction.date.tiered.window.policy.class";
|
||||||
|
public static final String SINGLE_OUTPUT_FOR_MINOR_COMPACTION_KEY =
|
||||||
|
"hbase.hstore.compaction.date.tiered.single.output.for.minor.compaction";
|
||||||
|
|
||||||
private static final Class<? extends RatioBasedCompactionPolicy>
|
private static final Class<? extends RatioBasedCompactionPolicy>
|
||||||
DEFAULT_TIER_COMPACTION_POLICY_CLASS = ExploringCompactionPolicy.class;
|
DEFAULT_TIER_COMPACTION_POLICY_CLASS = ExploringCompactionPolicy.class;
|
||||||
|
@ -109,6 +111,7 @@ public class CompactionConfiguration {
|
||||||
private final int windowsPerTier;
|
private final int windowsPerTier;
|
||||||
private final int incomingWindowMin;
|
private final int incomingWindowMin;
|
||||||
private final String compactionPolicyForTieredWindow;
|
private final String compactionPolicyForTieredWindow;
|
||||||
|
private final boolean singleOutputForMinorCompaction;
|
||||||
|
|
||||||
CompactionConfiguration(Configuration conf, StoreConfigInformation storeConfigInfo) {
|
CompactionConfiguration(Configuration conf, StoreConfigInformation storeConfigInfo) {
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
|
@ -138,6 +141,9 @@ public class CompactionConfiguration {
|
||||||
incomingWindowMin = conf.getInt(INCOMING_WINDOW_MIN_KEY, 6);
|
incomingWindowMin = conf.getInt(INCOMING_WINDOW_MIN_KEY, 6);
|
||||||
compactionPolicyForTieredWindow = conf.get(COMPACTION_POLICY_CLASS_FOR_TIERED_WINDOWS_KEY,
|
compactionPolicyForTieredWindow = conf.get(COMPACTION_POLICY_CLASS_FOR_TIERED_WINDOWS_KEY,
|
||||||
DEFAULT_TIER_COMPACTION_POLICY_CLASS.getName());
|
DEFAULT_TIER_COMPACTION_POLICY_CLASS.getName());
|
||||||
|
singleOutputForMinorCompaction = conf.getBoolean(SINGLE_OUTPUT_FOR_MINOR_COMPACTION_KEY,
|
||||||
|
true);
|
||||||
|
|
||||||
LOG.info(this);
|
LOG.info(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -278,4 +284,8 @@ public class CompactionConfiguration {
|
||||||
public String getCompactionPolicyForTieredWindow() {
|
public String getCompactionPolicyForTieredWindow() {
|
||||||
return compactionPolicyForTieredWindow;
|
return compactionPolicyForTieredWindow;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean useSingleOutputForMinorCompaction() {
|
||||||
|
return singleOutputForMinorCompaction;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -45,7 +45,7 @@ public abstract class CompactionPolicy {
|
||||||
* @param filesToCompact Files to compact. Can be null.
|
* @param filesToCompact Files to compact. Can be null.
|
||||||
* @return True if we should run a major compaction.
|
* @return True if we should run a major compaction.
|
||||||
*/
|
*/
|
||||||
public abstract boolean isMajorCompaction(
|
public abstract boolean shouldPerformMajorCompaction(
|
||||||
final Collection<StoreFile> filesToCompact) throws IOException;
|
final Collection<StoreFile> filesToCompact) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -18,6 +18,12 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.regionserver.compactions;
|
package org.apache.hadoop.hbase.regionserver.compactions;
|
||||||
|
|
||||||
|
import com.google.common.base.Function;
|
||||||
|
import com.google.common.base.Joiner;
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import com.google.common.base.Predicate;
|
||||||
|
import com.google.common.collect.Collections2;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
|
||||||
|
@ -31,12 +37,6 @@ import org.apache.hadoop.hbase.regionserver.StoreFile.Reader;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
|
||||||
import com.google.common.base.Function;
|
|
||||||
import com.google.common.base.Joiner;
|
|
||||||
import com.google.common.base.Preconditions;
|
|
||||||
import com.google.common.base.Predicate;
|
|
||||||
import com.google.common.collect.Collections2;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class holds all logical details necessary to run a compaction.
|
* This class holds all logical details necessary to run a compaction.
|
||||||
*/
|
*/
|
||||||
|
@ -74,6 +74,10 @@ public class CompactionRequest implements Comparable<CompactionRequest> {
|
||||||
recalculateSize();
|
recalculateSize();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void updateFiles(Collection<StoreFile> files) {
|
||||||
|
this.filesToCompact = files;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Called before compaction is executed by CompactSplitThread; for use by coproc subclasses.
|
* Called before compaction is executed by CompactSplitThread; for use by coproc subclasses.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
/**
|
/**
|
||||||
|
*
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
* distributed with this work for additional information
|
* distributed with this work for additional information
|
||||||
|
@ -19,11 +20,11 @@ package org.apache.hadoop.hbase.regionserver.compactions;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Predicate;
|
import com.google.common.base.Predicate;
|
||||||
import com.google.common.collect.ImmutableList;
|
|
||||||
import com.google.common.collect.Iterables;
|
import com.google.common.collect.Iterables;
|
||||||
import com.google.common.collect.Iterators;
|
import com.google.common.collect.Iterators;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.collect.PeekingIterator;
|
import com.google.common.collect.PeekingIterator;
|
||||||
|
import com.google.common.math.LongMath;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -35,9 +36,13 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
|
||||||
import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
|
import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
|
||||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.StoreUtils;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||||
|
@ -50,14 +55,13 @@ import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||||
* 3. Improve TTL efficiency.
|
* 3. Improve TTL efficiency.
|
||||||
* Perfect fit for the use cases that:
|
* Perfect fit for the use cases that:
|
||||||
* 1. has mostly date-based data write and scan and a focus on the most recent data.
|
* 1. has mostly date-based data write and scan and a focus on the most recent data.
|
||||||
* 2. never or rarely deletes data. Out-of-order writes are handled gracefully. Time range
|
* Out-of-order writes are handled gracefully. Time range overlapping among store files is
|
||||||
* overlapping among store files is tolerated and the performance impact is minimized. Configuration
|
* tolerated and the performance impact is minimized. Configuration can be set at hbase-site
|
||||||
* can be set at hbase-site or overriden at per-table or per-column-famly level by hbase shell.
|
* or overridden at per-table or per-column-family level by hbase shell. Design spec is at
|
||||||
* Design spec is at
|
|
||||||
* https://docs.google.com/document/d/1_AmlNb2N8Us1xICsTeGDLKIqL6T-oHoRLZ323MG_uy8/
|
* https://docs.google.com/document/d/1_AmlNb2N8Us1xICsTeGDLKIqL6T-oHoRLZ323MG_uy8/
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
|
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
|
||||||
public class DateTieredCompactionPolicy extends RatioBasedCompactionPolicy {
|
public class DateTieredCompactionPolicy extends SortedCompactionPolicy {
|
||||||
private static final Log LOG = LogFactory.getLog(DateTieredCompactionPolicy.class);
|
private static final Log LOG = LogFactory.getLog(DateTieredCompactionPolicy.class);
|
||||||
|
|
||||||
private RatioBasedCompactionPolicy compactionPolicyPerWindow;
|
private RatioBasedCompactionPolicy compactionPolicyPerWindow;
|
||||||
|
@ -67,111 +71,112 @@ public class DateTieredCompactionPolicy extends RatioBasedCompactionPolicy {
|
||||||
super(conf, storeConfigInfo);
|
super(conf, storeConfigInfo);
|
||||||
try {
|
try {
|
||||||
compactionPolicyPerWindow =
|
compactionPolicyPerWindow =
|
||||||
ReflectionUtils.instantiateWithCustomCtor(comConf.getCompactionPolicyForTieredWindow(),
|
ReflectionUtils.instantiateWithCustomCtor(comConf.getCompactionPolicyForTieredWindow(),
|
||||||
new Class[] { Configuration.class, StoreConfigInformation.class }, new Object[] { conf,
|
new Class[] { Configuration.class, StoreConfigInformation.class }, new Object[] { conf,
|
||||||
storeConfigInfo });
|
storeConfigInfo });
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new IOException("Unable to load configured compaction policy '"
|
throw new IOException("Unable to load configured compaction policy '"
|
||||||
+ comConf.getCompactionPolicyForTieredWindow() + "'", e);
|
+ comConf.getCompactionPolicyForTieredWindow() + "'", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isMajorCompaction(Collection<StoreFile> filesToCompact) throws IOException {
|
|
||||||
// Never do major compaction unless forced
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
/**
|
/**
|
||||||
* Heuristics for guessing whether we need compaction.
|
* Heuristics for guessing whether we need minor compaction.
|
||||||
*/
|
*/
|
||||||
public boolean needsCompaction(final Collection<StoreFile> storeFiles,
|
@Override
|
||||||
final List<StoreFile> filesCompacting) {
|
|
||||||
return needsCompaction(storeFiles, filesCompacting, EnvironmentEdgeManager.currentTime());
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public boolean needsCompaction(final Collection<StoreFile> storeFiles,
|
public boolean needsCompaction(final Collection<StoreFile> storeFiles,
|
||||||
final List<StoreFile> filesCompacting, long now) {
|
final List<StoreFile> filesCompacting) {
|
||||||
if (!super.needsCompaction(storeFiles, filesCompacting)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
ArrayList<StoreFile> candidates = new ArrayList<StoreFile>(storeFiles);
|
ArrayList<StoreFile> candidates = new ArrayList<StoreFile>(storeFiles);
|
||||||
candidates = filterBulk(candidates);
|
|
||||||
candidates = skipLargeFiles(candidates, true);
|
|
||||||
try {
|
try {
|
||||||
candidates = applyCompactionPolicy(candidates, true, false, now);
|
return selectMinorCompaction(candidates, false, true) != null;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.error("Can not check for compaction: ", e);
|
LOG.error("Can not check for compaction: ", e);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
return candidates != null && candidates.size() >= comConf.getMinFilesToCompact();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
public boolean shouldPerformMajorCompaction(final Collection<StoreFile> filesToCompact)
|
||||||
* Could return null if no candidates are found
|
throws IOException {
|
||||||
*/
|
long mcTime = getNextMajorCompactTime(filesToCompact);
|
||||||
@Override
|
if (filesToCompact == null || mcTime == 0) {
|
||||||
public ArrayList<StoreFile> applyCompactionPolicy(ArrayList<StoreFile> candidates,
|
return false;
|
||||||
boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
|
|
||||||
return applyCompactionPolicy(candidates, mayUseOffPeak, mayBeStuck,
|
|
||||||
EnvironmentEdgeManager.currentTime());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Input candidates are sorted from oldest to newest by seqId. Could return null if no candidates
|
|
||||||
* are found.
|
|
||||||
*/
|
|
||||||
@VisibleForTesting
|
|
||||||
public ArrayList<StoreFile> applyCompactionPolicy(ArrayList<StoreFile> candidates,
|
|
||||||
boolean mayUseOffPeak, boolean mayBeStuck, long now) throws IOException {
|
|
||||||
Iterable<StoreFile> candidatesInWindow =
|
|
||||||
filterOldStoreFiles(Lists.newArrayList(candidates), comConf.getMaxStoreFileAgeMillis(), now);
|
|
||||||
|
|
||||||
List<ArrayList<StoreFile>> buckets =
|
|
||||||
partitionFilesToBuckets(candidatesInWindow, comConf.getBaseWindowMillis(),
|
|
||||||
comConf.getWindowsPerTier(), now);
|
|
||||||
LOG.debug("Compaction buckets are: " + buckets);
|
|
||||||
if (buckets.size() >= storeConfigInfo.getBlockingFileCount()) {
|
|
||||||
LOG.warn("Number of compaction buckets:" + buckets.size()
|
|
||||||
+ ", exceeds blocking file count setting: "
|
|
||||||
+ storeConfigInfo.getBlockingFileCount()
|
|
||||||
+ ", either increase hbase.hstore.blockingStoreFiles or "
|
|
||||||
+ "reduce the number of tiered compaction windows");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return newestBucket(buckets, comConf.getIncomingWindowMin(), now,
|
// TODO: Use better method for determining stamp of last major (HBASE-2990)
|
||||||
comConf.getBaseWindowMillis(), mayUseOffPeak);
|
long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact);
|
||||||
}
|
long now = EnvironmentEdgeManager.currentTime();
|
||||||
|
if (lowTimestamp <= 0L || lowTimestamp >= (now - mcTime)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
long cfTTL = this.storeConfigInfo.getStoreFileTtl();
|
||||||
* @param buckets the list of buckets, sorted from newest to oldest, from which to return the
|
HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
|
||||||
* newest bucket within thresholds.
|
long oldestToCompact = getOldestToCompact(comConf.getMaxStoreFileAgeMillis(), now);
|
||||||
* @param incomingWindowThreshold minimum number of storeFiles in a bucket to qualify.
|
List<Long> boundaries = getCompactBoundariesForMajor(filesToCompact, oldestToCompact, now);
|
||||||
* @param maxThreshold maximum number of storeFiles to compact at once (the returned bucket will
|
boolean[] filesInWindow = new boolean[boundaries.size()];
|
||||||
* be trimmed down to this).
|
|
||||||
* @return a bucket (a list of store files within a window to be compacted).
|
for (StoreFile file: filesToCompact) {
|
||||||
* @throws IOException error
|
Long minTimestamp = file.getMinimumTimestamp();
|
||||||
*/
|
long oldest = (minTimestamp == null) ? (Long)Long.MIN_VALUE : now - minTimestamp.longValue();
|
||||||
private ArrayList<StoreFile> newestBucket(List<ArrayList<StoreFile>> buckets,
|
if (cfTTL != HConstants.FOREVER && oldest >= cfTTL) {
|
||||||
int incomingWindowThreshold, long now, long baseWindowMillis, boolean mayUseOffPeak)
|
LOG.debug("Major compaction triggered on store " + this
|
||||||
throws IOException {
|
+ "; for TTL maintenance");
|
||||||
Window incomingWindow = getInitialWindow(now, baseWindowMillis);
|
return true;
|
||||||
for (ArrayList<StoreFile> bucket : buckets) {
|
|
||||||
int minThreshold =
|
|
||||||
incomingWindow.compareToTimestamp(bucket.get(0).getMaximumTimestamp()) <= 0 ? comConf
|
|
||||||
.getIncomingWindowMin() : comConf.getMinFilesToCompact();
|
|
||||||
compactionPolicyPerWindow.setMinThreshold(minThreshold);
|
|
||||||
ArrayList<StoreFile> candidates =
|
|
||||||
compactionPolicyPerWindow.applyCompactionPolicy(bucket, mayUseOffPeak, false);
|
|
||||||
if (candidates != null && !candidates.isEmpty()) {
|
|
||||||
return candidates;
|
|
||||||
}
|
}
|
||||||
|
if (!file.isMajorCompaction() || file.isBulkLoadResult()) {
|
||||||
|
LOG.debug("Major compaction triggered on store " + this
|
||||||
|
+ ", because there are new files and time since last major compaction "
|
||||||
|
+ (now - lowTimestamp) + "ms");
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
int lowerWindowIndex = Collections.binarySearch(boundaries,
|
||||||
|
minTimestamp == null ? (Long)Long.MAX_VALUE : minTimestamp);
|
||||||
|
int upperWindowIndex = Collections.binarySearch(boundaries,
|
||||||
|
file.getMaximumTimestamp() == null ? (Long)Long.MAX_VALUE : file.getMaximumTimestamp());
|
||||||
|
if (lowerWindowIndex != upperWindowIndex) {
|
||||||
|
LOG.debug("Major compaction triggered on store " + this + "; because file "
|
||||||
|
+ file.getPath() + " has data with timestamps cross window boundaries");
|
||||||
|
return true;
|
||||||
|
} else if (filesInWindow[upperWindowIndex]) {
|
||||||
|
LOG.debug("Major compaction triggered on store " + this +
|
||||||
|
"; because there are more than one file in some windows");
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
filesInWindow[upperWindowIndex] = true;
|
||||||
|
}
|
||||||
|
hdfsBlocksDistribution.add(file.getHDFSBlockDistribution());
|
||||||
}
|
}
|
||||||
return null;
|
|
||||||
|
float blockLocalityIndex = hdfsBlocksDistribution
|
||||||
|
.getBlockLocalityIndex(RSRpcServices.getHostname(comConf.conf, false));
|
||||||
|
if (blockLocalityIndex < comConf.getMinLocalityToForceCompact()) {
|
||||||
|
LOG.debug("Major compaction triggered on store " + this
|
||||||
|
+ "; to make hdfs blocks local, current blockLocalityIndex is "
|
||||||
|
+ blockLocalityIndex + " (min " + comConf.getMinLocalityToForceCompact() + ")");
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.debug("Skipping major compaction of " + this +
|
||||||
|
", because the files are already major compacted");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected CompactionRequest createCompactionRequest(ArrayList<StoreFile> candidateSelection,
|
||||||
|
boolean tryingMajor, boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
|
||||||
|
CompactionRequest result = tryingMajor ? selectMajorCompaction(candidateSelection)
|
||||||
|
: selectMinorCompaction(candidateSelection, mayUseOffPeak, mayBeStuck);
|
||||||
|
LOG.debug("Generated compaction request: " + result);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
public CompactionRequest selectMajorCompaction(ArrayList<StoreFile> candidateSelection) {
|
||||||
|
long now = EnvironmentEdgeManager.currentTime();
|
||||||
|
long oldestToCompact = getOldestToCompact(comConf.getMaxStoreFileAgeMillis(), now);
|
||||||
|
return new DateTieredCompactionRequest(candidateSelection,
|
||||||
|
this.getCompactBoundariesForMajor(candidateSelection, oldestToCompact, now));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -179,63 +184,134 @@ public class DateTieredCompactionPolicy extends RatioBasedCompactionPolicy {
|
||||||
* current file has a maxTimestamp older than last known maximum, treat this file as it carries
|
* current file has a maxTimestamp older than last known maximum, treat this file as it carries
|
||||||
* the last known maximum. This way both seqId and timestamp are in the same order. If files carry
|
* the last known maximum. This way both seqId and timestamp are in the same order. If files carry
|
||||||
* the same maxTimestamps, they are ordered by seqId. We then reverse the list so they are ordered
|
* the same maxTimestamps, they are ordered by seqId. We then reverse the list so they are ordered
|
||||||
* by seqId and maxTimestamp in decending order and build the time windows. All the out-of-order
|
* by seqId and maxTimestamp in descending order and build the time windows. All the out-of-order
|
||||||
* data into the same compaction windows, guaranteeing contiguous compaction based on sequence id.
|
* data into the same compaction windows, guaranteeing contiguous compaction based on sequence id.
|
||||||
*/
|
*/
|
||||||
private static List<ArrayList<StoreFile>> partitionFilesToBuckets(Iterable<StoreFile> storeFiles,
|
public CompactionRequest selectMinorCompaction(ArrayList<StoreFile> candidateSelection,
|
||||||
long baseWindowSizeMillis, int windowsPerTier, long now) {
|
boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
|
||||||
List<ArrayList<StoreFile>> buckets = Lists.newArrayList();
|
long now = EnvironmentEdgeManager.currentTime();
|
||||||
Window window = getInitialWindow(now, baseWindowSizeMillis);
|
long oldestToCompact = getOldestToCompact(comConf.getMaxStoreFileAgeMillis(), now);
|
||||||
|
|
||||||
|
// Make sure the store files is sorted by SeqId then maxTimestamp
|
||||||
|
List<StoreFile> storeFileList = Lists.newArrayList(filterOldStoreFiles(candidateSelection,
|
||||||
|
oldestToCompact));
|
||||||
|
Collections.sort(storeFileList, StoreFile.Comparators.SEQ_ID_MAX_TIMESTAMP);
|
||||||
|
|
||||||
List<Pair<StoreFile, Long>> storefileMaxTimestampPairs =
|
List<Pair<StoreFile, Long>> storefileMaxTimestampPairs =
|
||||||
Lists.newArrayListWithCapacity(Iterables.size(storeFiles));
|
Lists.newArrayListWithCapacity(Iterables.size(storeFileList));
|
||||||
long maxTimestampSeen = Long.MIN_VALUE;
|
long maxTimestampSeen = Long.MIN_VALUE;
|
||||||
for (StoreFile storeFile : storeFiles) {
|
for (StoreFile storeFile : storeFileList) {
|
||||||
// if there is out-of-order data,
|
// if there is out-of-order data,
|
||||||
// we put them in the same window as the last file in increasing order
|
// we put them in the same window as the last file in increasing order
|
||||||
maxTimestampSeen = Math.max(maxTimestampSeen, storeFile.getMaximumTimestamp());
|
maxTimestampSeen = Math.max(maxTimestampSeen,
|
||||||
|
storeFile.getMaximumTimestamp() == null? Long.MIN_VALUE : storeFile.getMaximumTimestamp());
|
||||||
storefileMaxTimestampPairs.add(new Pair<StoreFile, Long>(storeFile, maxTimestampSeen));
|
storefileMaxTimestampPairs.add(new Pair<StoreFile, Long>(storeFile, maxTimestampSeen));
|
||||||
}
|
}
|
||||||
|
|
||||||
Collections.reverse(storefileMaxTimestampPairs);
|
Collections.reverse(storefileMaxTimestampPairs);
|
||||||
|
|
||||||
|
Window window = getIncomingWindow(now, comConf.getBaseWindowMillis());
|
||||||
|
int minThreshold = comConf.getIncomingWindowMin();
|
||||||
PeekingIterator<Pair<StoreFile, Long>> it =
|
PeekingIterator<Pair<StoreFile, Long>> it =
|
||||||
Iterators.peekingIterator(storefileMaxTimestampPairs.iterator());
|
Iterators.peekingIterator(storefileMaxTimestampPairs.iterator());
|
||||||
|
|
||||||
while (it.hasNext()) {
|
while (it.hasNext()) {
|
||||||
int compResult = window.compareToTimestamp(it.peek().getSecond());
|
int compResult = window.compareToTimestamp(it.peek().getSecond());
|
||||||
if (compResult > 0) {
|
if (compResult > 0) {
|
||||||
// If the file is too old for the window, switch to the next window
|
// If the file is too old for the window, switch to the next window
|
||||||
window = window.nextWindow(windowsPerTier);
|
window = window.nextWindow(comConf.getWindowsPerTier(),
|
||||||
|
oldestToCompact);
|
||||||
|
minThreshold = comConf.getMinFilesToCompact();
|
||||||
} else {
|
} else {
|
||||||
// The file is within the target window
|
// The file is within the target window
|
||||||
ArrayList<StoreFile> bucket = Lists.newArrayList();
|
ArrayList<StoreFile> fileList = Lists.newArrayList();
|
||||||
// Add all files in the same window to current bucket. For incoming window
|
// Add all files in the same window. For incoming window
|
||||||
// we tolerate files with future data although it is sub-optimal
|
// we tolerate files with future data although it is sub-optimal
|
||||||
while (it.hasNext() && window.compareToTimestamp(it.peek().getSecond()) <= 0) {
|
while (it.hasNext() && window.compareToTimestamp(it.peek().getSecond()) <= 0) {
|
||||||
bucket.add(it.next().getFirst());
|
fileList.add(it.next().getFirst());
|
||||||
}
|
}
|
||||||
if (!bucket.isEmpty()) {
|
if (fileList.size() >= minThreshold) {
|
||||||
buckets.add(bucket);
|
LOG.debug("Processing files: " + fileList + " for window: " + window);
|
||||||
|
DateTieredCompactionRequest request = generateCompactionRequest(fileList, window,
|
||||||
|
mayUseOffPeak, mayBeStuck, minThreshold);
|
||||||
|
if (request != null) {
|
||||||
|
return request;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// A non-null file list is expected by HStore
|
||||||
|
return new CompactionRequest(Collections.<StoreFile> emptyList());
|
||||||
|
}
|
||||||
|
|
||||||
return buckets;
|
private DateTieredCompactionRequest generateCompactionRequest(ArrayList<StoreFile> storeFiles,
|
||||||
|
Window window, boolean mayUseOffPeak, boolean mayBeStuck, int minThreshold)
|
||||||
|
throws IOException {
|
||||||
|
// The files has to be in ascending order for ratio-based compaction to work right
|
||||||
|
// and removeExcessFile to exclude youngest files.
|
||||||
|
Collections.reverse(storeFiles);
|
||||||
|
|
||||||
|
// Compact everything in the window if have more files than comConf.maxBlockingFiles
|
||||||
|
compactionPolicyPerWindow.setMinThreshold(minThreshold);
|
||||||
|
ArrayList<StoreFile> storeFileSelection = mayBeStuck ? storeFiles
|
||||||
|
: compactionPolicyPerWindow.applyCompactionPolicy(storeFiles, mayUseOffPeak, false);
|
||||||
|
if (storeFileSelection != null && !storeFileSelection.isEmpty()) {
|
||||||
|
// If there is any file in the window excluded from compaction,
|
||||||
|
// only one file will be output from compaction.
|
||||||
|
boolean singleOutput = storeFiles.size() != storeFileSelection.size() ||
|
||||||
|
comConf.useSingleOutputForMinorCompaction();
|
||||||
|
List<Long> boundaries = getCompactionBoundariesForMinor(window, singleOutput);
|
||||||
|
DateTieredCompactionRequest result = new DateTieredCompactionRequest(storeFileSelection,
|
||||||
|
boundaries);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return a list of boundaries for multiple compaction output
|
||||||
|
* in ascending order.
|
||||||
|
*/
|
||||||
|
private List<Long> getCompactBoundariesForMajor(Collection<StoreFile> filesToCompact,
|
||||||
|
long oldestToCompact, long now) {
|
||||||
|
long minTimestamp = Long.MAX_VALUE;
|
||||||
|
for (StoreFile file : filesToCompact) {
|
||||||
|
minTimestamp = Math.min(minTimestamp,
|
||||||
|
file.getMinimumTimestamp() == null? Long.MAX_VALUE : file.getMinimumTimestamp());
|
||||||
|
}
|
||||||
|
|
||||||
|
List<Long> boundaries = new ArrayList<Long>();
|
||||||
|
|
||||||
|
// Add startMillis of all windows between now and min timestamp
|
||||||
|
for (Window window = getIncomingWindow(now, comConf.getBaseWindowMillis());
|
||||||
|
window.compareToTimestamp(minTimestamp) > 0;
|
||||||
|
window = window.nextWindow(comConf.getWindowsPerTier(), oldestToCompact)) {
|
||||||
|
boundaries.add(window.startMillis());
|
||||||
|
}
|
||||||
|
boundaries.add(Long.MIN_VALUE);
|
||||||
|
Collections.reverse(boundaries);
|
||||||
|
return boundaries;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return a list of boundaries for multiple compaction output
|
||||||
|
* from minTimestamp to maxTimestamp.
|
||||||
|
*/
|
||||||
|
private static List<Long> getCompactionBoundariesForMinor(Window window, boolean singleOutput) {
|
||||||
|
List<Long> boundaries = new ArrayList<Long>();
|
||||||
|
boundaries.add(Long.MIN_VALUE);
|
||||||
|
if (!singleOutput) {
|
||||||
|
boundaries.add(window.startMillis());
|
||||||
|
}
|
||||||
|
return boundaries;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Removes all store files with max timestamp older than (current - maxAge).
|
* Removes all store files with max timestamp older than (current - maxAge).
|
||||||
* @param storeFiles all store files to consider
|
* @param storeFiles all store files to consider
|
||||||
* @param maxAge the age in milliseconds when a store file stops participating in compaction.
|
* @param maxAge the age in milliseconds when a store file stops participating in compaction.
|
||||||
* @param now current time. store files with max timestamp less than (now - maxAge) are filtered.
|
|
||||||
* @return a list of storeFiles with the store file older than maxAge excluded
|
* @return a list of storeFiles with the store file older than maxAge excluded
|
||||||
*/
|
*/
|
||||||
private static Iterable<StoreFile> filterOldStoreFiles(List<StoreFile> storeFiles, long maxAge,
|
private static Iterable<StoreFile> filterOldStoreFiles(List<StoreFile> storeFiles,
|
||||||
long now) {
|
final long cutoff) {
|
||||||
if (maxAge == 0) {
|
|
||||||
return ImmutableList.of();
|
|
||||||
}
|
|
||||||
final long cutoff = now - maxAge;
|
|
||||||
return Iterables.filter(storeFiles, new Predicate<StoreFile>() {
|
return Iterables.filter(storeFiles, new Predicate<StoreFile>() {
|
||||||
@Override
|
@Override
|
||||||
public boolean apply(StoreFile storeFile) {
|
public boolean apply(StoreFile storeFile) {
|
||||||
|
@ -243,13 +319,24 @@ public class DateTieredCompactionPolicy extends RatioBasedCompactionPolicy {
|
||||||
if (storeFile == null) {
|
if (storeFile == null) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
return storeFile.getMaximumTimestamp() >= cutoff;
|
Long maxTimestamp = storeFile.getMaximumTimestamp();
|
||||||
|
return maxTimestamp == null ? true : maxTimestamp >= cutoff;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Window getInitialWindow(long now, long timeUnit) {
|
private static Window getIncomingWindow(long now, long baseWindowMillis) {
|
||||||
return new Window(timeUnit, now / timeUnit);
|
return new Window(baseWindowMillis, now / baseWindowMillis);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static long getOldestToCompact(long maxAgeMillis, long now) {
|
||||||
|
try {
|
||||||
|
return LongMath.checkedSubtract(now, maxAgeMillis);
|
||||||
|
} catch (ArithmeticException ae) {
|
||||||
|
LOG.warn("Value for " + CompactionConfiguration.MAX_AGE_MILLIS_KEY + ": " + maxAgeMillis
|
||||||
|
+ ". All the files will be eligible for minor compaction.");
|
||||||
|
return Long.MIN_VALUE;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -268,7 +355,7 @@ public class DateTieredCompactionPolicy extends RatioBasedCompactionPolicy {
|
||||||
private final long divPosition;
|
private final long divPosition;
|
||||||
|
|
||||||
private Window(long baseWindowMillis, long divPosition) {
|
private Window(long baseWindowMillis, long divPosition) {
|
||||||
this.windowMillis = baseWindowMillis;
|
windowMillis = baseWindowMillis;
|
||||||
this.divPosition = divPosition;
|
this.divPosition = divPosition;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -279,6 +366,13 @@ public class DateTieredCompactionPolicy extends RatioBasedCompactionPolicy {
|
||||||
* or after than the timestamp.
|
* or after than the timestamp.
|
||||||
*/
|
*/
|
||||||
public int compareToTimestamp(long timestamp) {
|
public int compareToTimestamp(long timestamp) {
|
||||||
|
if (timestamp < 0) {
|
||||||
|
try {
|
||||||
|
timestamp = LongMath.checkedSubtract(timestamp, windowMillis - 1);
|
||||||
|
} catch (ArithmeticException ae) {
|
||||||
|
timestamp = Long.MIN_VALUE;
|
||||||
|
}
|
||||||
|
}
|
||||||
long pos = timestamp / windowMillis;
|
long pos = timestamp / windowMillis;
|
||||||
return divPosition == pos ? 0 : divPosition < pos ? -1 : 1;
|
return divPosition == pos ? 0 : divPosition < pos ? -1 : 1;
|
||||||
}
|
}
|
||||||
|
@ -290,12 +384,42 @@ public class DateTieredCompactionPolicy extends RatioBasedCompactionPolicy {
|
||||||
* following those will be <code>tierBase</code> times as big.
|
* following those will be <code>tierBase</code> times as big.
|
||||||
* @return The next window
|
* @return The next window
|
||||||
*/
|
*/
|
||||||
public Window nextWindow(int windowsPerTier) {
|
public Window nextWindow(int windowsPerTier, long oldestToCompact) {
|
||||||
if (divPosition % windowsPerTier > 0) {
|
// Don't promote to the next tier if there is not even 1 window at current tier
|
||||||
|
// or if the next window crosses the max age.
|
||||||
|
if (divPosition % windowsPerTier > 0 ||
|
||||||
|
startMillis() - windowMillis * windowsPerTier < oldestToCompact) {
|
||||||
return new Window(windowMillis, divPosition - 1);
|
return new Window(windowMillis, divPosition - 1);
|
||||||
} else {
|
} else {
|
||||||
return new Window(windowMillis * windowsPerTier, divPosition / windowsPerTier - 1);
|
return new Window(windowMillis * windowsPerTier, divPosition / windowsPerTier - 1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Inclusive lower bound
|
||||||
|
*/
|
||||||
|
public long startMillis() {
|
||||||
|
try {
|
||||||
|
return LongMath.checkedMultiply(windowMillis, divPosition);
|
||||||
|
} catch (ArithmeticException ae) {
|
||||||
|
return Long.MIN_VALUE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Exclusive upper bound
|
||||||
|
*/
|
||||||
|
public long endMillis() {
|
||||||
|
try {
|
||||||
|
return LongMath.checkedMultiply(windowMillis, (divPosition + 1));
|
||||||
|
} catch (ArithmeticException ae) {
|
||||||
|
return Long.MAX_VALUE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "[" + startMillis() + ", " + endMillis() + ")";
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -0,0 +1,44 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.regionserver.compactions;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||||
|
|
||||||
|
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_DOESNT_OVERRIDE_EQUALS",
|
||||||
|
justification="It is intended to use the same equal method as superclass")
|
||||||
|
public class DateTieredCompactionRequest extends CompactionRequest {
|
||||||
|
private List<Long> boundaries;
|
||||||
|
|
||||||
|
public DateTieredCompactionRequest(Collection<StoreFile> files, List<Long> boundaryList) {
|
||||||
|
super(files);
|
||||||
|
boundaries = boundaryList;
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<Long> getBoundaries() {
|
||||||
|
return boundaries;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return super.toString() + " boundaries=" + Arrays.toString(boundaries.toArray());
|
||||||
|
}
|
||||||
|
}
|
|
@ -25,8 +25,8 @@ import java.util.List;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
|
import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
|
||||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||||
|
|
||||||
|
@ -51,7 +51,7 @@ public class ExploringCompactionPolicy extends RatioBasedCompactionPolicy {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
final ArrayList<StoreFile> applyCompactionPolicy(final ArrayList<StoreFile> candidates,
|
protected final ArrayList<StoreFile> applyCompactionPolicy(final ArrayList<StoreFile> candidates,
|
||||||
final boolean mayUseOffPeak, final boolean mightBeStuck) throws IOException {
|
final boolean mayUseOffPeak, final boolean mightBeStuck) throws IOException {
|
||||||
return new ArrayList<StoreFile>(applyCompactionPolicy(candidates, mightBeStuck,
|
return new ArrayList<StoreFile>(applyCompactionPolicy(candidates, mightBeStuck,
|
||||||
mayUseOffPeak, comConf.getMinFilesToCompact(), comConf.getMaxFilesToCompact()));
|
mayUseOffPeak, comConf.getMinFilesToCompact(), comConf.getMaxFilesToCompact()));
|
||||||
|
|
|
@ -76,11 +76,12 @@ public class FIFOCompactionPolicy extends ExploringCompactionPolicy {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isMajorCompaction(Collection<StoreFile> filesToCompact) throws IOException {
|
public boolean shouldPerformMajorCompaction(Collection<StoreFile> filesToCompact)
|
||||||
|
throws IOException {
|
||||||
boolean isAfterSplit = StoreUtils.hasReferences(filesToCompact);
|
boolean isAfterSplit = StoreUtils.hasReferences(filesToCompact);
|
||||||
if(isAfterSplit){
|
if(isAfterSplit){
|
||||||
LOG.info("Split detected, delegate to the parent policy.");
|
LOG.info("Split detected, delegate to the parent policy.");
|
||||||
return super.isMajorCompaction(filesToCompact);
|
return super.shouldPerformMajorCompaction(filesToCompact);
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,14 +16,12 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.regionserver.compactions;
|
package org.apache.hadoop.hbase.regionserver.compactions;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Random;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -35,17 +33,13 @@ import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
|
||||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||||
import org.apache.hadoop.hbase.regionserver.StoreUtils;
|
import org.apache.hadoop.hbase.regionserver.StoreUtils;
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
|
||||||
import com.google.common.base.Predicate;
|
|
||||||
import com.google.common.collect.Collections2;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The default algorithm for selecting files for compaction.
|
* The default algorithm for selecting files for compaction.
|
||||||
* Combines the compaction configuration and the provisional file selection that
|
* Combines the compaction configuration and the provisional file selection that
|
||||||
* it's given to produce the list of suitable candidates for compaction.
|
* it's given to produce the list of suitable candidates for compaction.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class RatioBasedCompactionPolicy extends CompactionPolicy {
|
public class RatioBasedCompactionPolicy extends SortedCompactionPolicy {
|
||||||
private static final Log LOG = LogFactory.getLog(RatioBasedCompactionPolicy.class);
|
private static final Log LOG = LogFactory.getLog(RatioBasedCompactionPolicy.class);
|
||||||
|
|
||||||
public RatioBasedCompactionPolicy(Configuration conf,
|
public RatioBasedCompactionPolicy(Configuration conf,
|
||||||
|
@ -53,154 +47,72 @@ public class RatioBasedCompactionPolicy extends CompactionPolicy {
|
||||||
super(conf, storeConfigInfo);
|
super(conf, storeConfigInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
private ArrayList<StoreFile> getCurrentEligibleFiles(
|
/*
|
||||||
ArrayList<StoreFile> candidateFiles, final List<StoreFile> filesCompacting) {
|
* @param filesToCompact Files to compact. Can be null.
|
||||||
// candidates = all storefiles not already in compaction queue
|
* @return True if we should run a major compaction.
|
||||||
if (!filesCompacting.isEmpty()) {
|
|
||||||
// exclude all files older than the newest file we're currently
|
|
||||||
// compacting. this allows us to preserve contiguity (HBASE-2856)
|
|
||||||
StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
|
|
||||||
int idx = candidateFiles.indexOf(last);
|
|
||||||
Preconditions.checkArgument(idx != -1);
|
|
||||||
candidateFiles.subList(0, idx + 1).clear();
|
|
||||||
}
|
|
||||||
return candidateFiles;
|
|
||||||
}
|
|
||||||
|
|
||||||
public List<StoreFile> preSelectCompactionForCoprocessor(
|
|
||||||
final Collection<StoreFile> candidates, final List<StoreFile> filesCompacting) {
|
|
||||||
return getCurrentEligibleFiles(new ArrayList<StoreFile>(candidates), filesCompacting);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param candidateFiles candidate files, ordered from oldest to newest by seqId. We rely on
|
|
||||||
* DefaultStoreFileManager to sort the files by seqId to guarantee contiguous compaction based
|
|
||||||
* on seqId for data consistency.
|
|
||||||
* @return subset copy of candidate list that meets compaction criteria
|
|
||||||
* @throws java.io.IOException
|
|
||||||
*/
|
*/
|
||||||
public CompactionRequest selectCompaction(Collection<StoreFile> candidateFiles,
|
@Override
|
||||||
final List<StoreFile> filesCompacting, final boolean isUserCompaction,
|
public boolean shouldPerformMajorCompaction(final Collection<StoreFile> filesToCompact)
|
||||||
final boolean mayUseOffPeak, final boolean forceMajor) throws IOException {
|
throws IOException {
|
||||||
// Preliminary compaction subject to filters
|
boolean result = false;
|
||||||
ArrayList<StoreFile> candidateSelection = new ArrayList<StoreFile>(candidateFiles);
|
long mcTime = getNextMajorCompactTime(filesToCompact);
|
||||||
// Stuck and not compacting enough (estimate). It is not guaranteed that we will be
|
if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) {
|
||||||
// able to compact more if stuck and compacting, because ratio policy excludes some
|
return result;
|
||||||
// non-compacting files from consideration during compaction (see getCurrentEligibleFiles).
|
|
||||||
int futureFiles = filesCompacting.isEmpty() ? 0 : 1;
|
|
||||||
boolean mayBeStuck = (candidateFiles.size() - filesCompacting.size() + futureFiles)
|
|
||||||
>= storeConfigInfo.getBlockingFileCount();
|
|
||||||
candidateSelection = getCurrentEligibleFiles(candidateSelection, filesCompacting);
|
|
||||||
LOG.debug("Selecting compaction from " + candidateFiles.size() + " store files, " +
|
|
||||||
filesCompacting.size() + " compacting, " + candidateSelection.size() +
|
|
||||||
" eligible, " + storeConfigInfo.getBlockingFileCount() + " blocking");
|
|
||||||
|
|
||||||
// If we can't have all files, we cannot do major anyway
|
|
||||||
boolean isAllFiles = candidateFiles.size() == candidateSelection.size();
|
|
||||||
if (!(forceMajor && isAllFiles)) {
|
|
||||||
candidateSelection = skipLargeFiles(candidateSelection, mayUseOffPeak);
|
|
||||||
isAllFiles = candidateFiles.size() == candidateSelection.size();
|
|
||||||
}
|
}
|
||||||
|
// TODO: Use better method for determining stamp of last major (HBASE-2990)
|
||||||
// Try a major compaction if this is a user-requested major compaction,
|
long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact);
|
||||||
// or if we do not have too many files to compact and this was requested as a major compaction
|
long now = System.currentTimeMillis();
|
||||||
boolean isTryingMajor = (forceMajor && isAllFiles && isUserCompaction)
|
if (lowTimestamp > 0L && lowTimestamp < (now - mcTime)) {
|
||||||
|| (((forceMajor && isAllFiles) || isMajorCompaction(candidateSelection))
|
// Major compaction time has elapsed.
|
||||||
&& (candidateSelection.size() < comConf.getMaxFilesToCompact()));
|
long cfTTL = this.storeConfigInfo.getStoreFileTtl();
|
||||||
// Or, if there are any references among the candidates.
|
if (filesToCompact.size() == 1) {
|
||||||
boolean isAfterSplit = StoreUtils.hasReferences(candidateSelection);
|
// Single file
|
||||||
if (!isTryingMajor && !isAfterSplit) {
|
StoreFile sf = filesToCompact.iterator().next();
|
||||||
// We're are not compacting all files, let's see what files are applicable
|
Long minTimestamp = sf.getMinimumTimestamp();
|
||||||
candidateSelection = filterBulk(candidateSelection);
|
long oldest = (minTimestamp == null) ? Long.MIN_VALUE : now - minTimestamp.longValue();
|
||||||
candidateSelection = applyCompactionPolicy(candidateSelection, mayUseOffPeak, mayBeStuck);
|
if (sf.isMajorCompaction() && (cfTTL == HConstants.FOREVER || oldest < cfTTL)) {
|
||||||
candidateSelection = checkMinFilesCriteria(candidateSelection);
|
float blockLocalityIndex =
|
||||||
|
sf.getHDFSBlockDistribution().getBlockLocalityIndex(
|
||||||
|
RSRpcServices.getHostname(comConf.conf, false));
|
||||||
|
if (blockLocalityIndex < comConf.getMinLocalityToForceCompact()) {
|
||||||
|
LOG.debug("Major compaction triggered on only store " + this
|
||||||
|
+ "; to make hdfs blocks local, current blockLocalityIndex is "
|
||||||
|
+ blockLocalityIndex + " (min " + comConf.getMinLocalityToForceCompact() + ")");
|
||||||
|
result = true;
|
||||||
|
} else {
|
||||||
|
LOG.debug("Skipping major compaction of " + this
|
||||||
|
+ " because one (major) compacted file only, oldestTime " + oldest
|
||||||
|
+ "ms is < TTL=" + cfTTL + " and blockLocalityIndex is " + blockLocalityIndex
|
||||||
|
+ " (min " + comConf.getMinLocalityToForceCompact() + ")");
|
||||||
|
}
|
||||||
|
} else if (cfTTL != HConstants.FOREVER && oldest > cfTTL) {
|
||||||
|
LOG.debug("Major compaction triggered on store " + this
|
||||||
|
+ ", because keyvalues outdated; time since last major compaction "
|
||||||
|
+ (now - lowTimestamp) + "ms");
|
||||||
|
result = true;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
LOG.debug("Major compaction triggered on store " + this
|
||||||
|
+ "; time since last major compaction " + (now - lowTimestamp) + "ms");
|
||||||
|
}
|
||||||
|
result = true;
|
||||||
}
|
}
|
||||||
candidateSelection = removeExcessFiles(candidateSelection, isUserCompaction, isTryingMajor);
|
|
||||||
// Now we have the final file list, so we can determine if we can do major/all files.
|
|
||||||
isAllFiles = (candidateFiles.size() == candidateSelection.size());
|
|
||||||
CompactionRequest result = new CompactionRequest(candidateSelection);
|
|
||||||
result.setOffPeak(!candidateSelection.isEmpty() && !isAllFiles && mayUseOffPeak);
|
|
||||||
result.setIsMajor(isTryingMajor && isAllFiles, isAllFiles);
|
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
@Override
|
||||||
* @param candidates pre-filtrate
|
protected CompactionRequest createCompactionRequest(ArrayList<StoreFile> candidateSelection,
|
||||||
* @return filtered subset
|
boolean tryingMajor, boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
|
||||||
* exclude all files above maxCompactSize
|
if (!tryingMajor) {
|
||||||
* Also save all references. We MUST compact them
|
candidateSelection = filterBulk(candidateSelection);
|
||||||
*/
|
candidateSelection = applyCompactionPolicy(candidateSelection, mayUseOffPeak, mayBeStuck);
|
||||||
protected ArrayList<StoreFile> skipLargeFiles(ArrayList<StoreFile> candidates,
|
candidateSelection = checkMinFilesCriteria(candidateSelection,
|
||||||
boolean mayUseOffpeak) {
|
comConf.getMinFilesToCompact());
|
||||||
int pos = 0;
|
|
||||||
while (pos < candidates.size() && !candidates.get(pos).isReference()
|
|
||||||
&& (candidates.get(pos).getReader().length() > comConf.getMaxCompactSize(mayUseOffpeak))) {
|
|
||||||
++pos;
|
|
||||||
}
|
}
|
||||||
if (pos > 0) {
|
return new CompactionRequest(candidateSelection);
|
||||||
LOG.debug("Some files are too large. Excluding " + pos
|
|
||||||
+ " files from compaction candidates");
|
|
||||||
candidates.subList(0, pos).clear();
|
|
||||||
}
|
|
||||||
return candidates;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param candidates pre-filtrate
|
|
||||||
* @return filtered subset
|
|
||||||
* exclude all bulk load files if configured
|
|
||||||
*/
|
|
||||||
protected ArrayList<StoreFile> filterBulk(ArrayList<StoreFile> candidates) {
|
|
||||||
candidates.removeAll(Collections2.filter(candidates,
|
|
||||||
new Predicate<StoreFile>() {
|
|
||||||
@Override
|
|
||||||
public boolean apply(StoreFile input) {
|
|
||||||
return input.excludeFromMinorCompaction();
|
|
||||||
}
|
|
||||||
}));
|
|
||||||
return candidates;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param candidates pre-filtrate
|
|
||||||
* @return filtered subset
|
|
||||||
* take upto maxFilesToCompact from the start
|
|
||||||
*/
|
|
||||||
private ArrayList<StoreFile> removeExcessFiles(ArrayList<StoreFile> candidates,
|
|
||||||
boolean isUserCompaction, boolean isMajorCompaction) {
|
|
||||||
int excess = candidates.size() - comConf.getMaxFilesToCompact();
|
|
||||||
if (excess > 0) {
|
|
||||||
if (isMajorCompaction && isUserCompaction) {
|
|
||||||
LOG.debug("Warning, compacting more than " + comConf.getMaxFilesToCompact() +
|
|
||||||
" files because of a user-requested major compaction");
|
|
||||||
} else {
|
|
||||||
LOG.debug("Too many admissible files. Excluding " + excess
|
|
||||||
+ " files from compaction candidates");
|
|
||||||
candidates.subList(comConf.getMaxFilesToCompact(), candidates.size()).clear();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return candidates;
|
|
||||||
}
|
|
||||||
/**
|
|
||||||
* @param candidates pre-filtrate
|
|
||||||
* @return filtered subset
|
|
||||||
* forget the compactionSelection if we don't have enough files
|
|
||||||
*/
|
|
||||||
protected ArrayList<StoreFile> checkMinFilesCriteria(ArrayList<StoreFile> candidates) {
|
|
||||||
int minFiles = comConf.getMinFilesToCompact();
|
|
||||||
if (candidates.size() < minFiles) {
|
|
||||||
if(LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("Not compacting files because we only have " + candidates.size() +
|
|
||||||
" files ready for compaction. Need " + minFiles + " to initiate.");
|
|
||||||
}
|
|
||||||
candidates.clear();
|
|
||||||
}
|
|
||||||
return candidates;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param candidates pre-filtrate
|
|
||||||
* @return filtered subset
|
|
||||||
* -- Default minor compaction selection algorithm:
|
* -- Default minor compaction selection algorithm:
|
||||||
* choose CompactSelection from candidates --
|
* choose CompactSelection from candidates --
|
||||||
* First exclude bulk-load files if indicated in configuration.
|
* First exclude bulk-load files if indicated in configuration.
|
||||||
|
@ -227,9 +139,11 @@ public class RatioBasedCompactionPolicy extends CompactionPolicy {
|
||||||
* | | | | | | | | _ | |
|
* | | | | | | | | _ | |
|
||||||
* | | | | | | | | | | | |
|
* | | | | | | | | | | | |
|
||||||
* | | | | | | | | | | | |
|
* | | | | | | | | | | | |
|
||||||
|
* @param candidates pre-filtrate
|
||||||
|
* @return filtered subset
|
||||||
*/
|
*/
|
||||||
ArrayList<StoreFile> applyCompactionPolicy(ArrayList<StoreFile> candidates,
|
protected ArrayList<StoreFile> applyCompactionPolicy(ArrayList<StoreFile> candidates,
|
||||||
boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
|
boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
|
||||||
if (candidates.isEmpty()) {
|
if (candidates.isEmpty()) {
|
||||||
return candidates;
|
return candidates;
|
||||||
}
|
}
|
||||||
|
@ -276,114 +190,12 @@ public class RatioBasedCompactionPolicy extends CompactionPolicy {
|
||||||
return candidates;
|
return candidates;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
* @param filesToCompact Files to compact. Can be null.
|
|
||||||
* @return True if we should run a major compaction.
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public boolean isMajorCompaction(final Collection<StoreFile> filesToCompact)
|
|
||||||
throws IOException {
|
|
||||||
boolean result = false;
|
|
||||||
long mcTime = getNextMajorCompactTime(filesToCompact);
|
|
||||||
if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) {
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
// TODO: Use better method for determining stamp of last major (HBASE-2990)
|
|
||||||
long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact);
|
|
||||||
long now = System.currentTimeMillis();
|
|
||||||
if (lowTimestamp > 0l && lowTimestamp < (now - mcTime)) {
|
|
||||||
// Major compaction time has elapsed.
|
|
||||||
long cfTtl = this.storeConfigInfo.getStoreFileTtl();
|
|
||||||
if (filesToCompact.size() == 1) {
|
|
||||||
// Single file
|
|
||||||
StoreFile sf = filesToCompact.iterator().next();
|
|
||||||
Long minTimestamp = sf.getMinimumTimestamp();
|
|
||||||
long oldest = (minTimestamp == null)
|
|
||||||
? Long.MIN_VALUE
|
|
||||||
: now - minTimestamp.longValue();
|
|
||||||
if (sf.isMajorCompaction() &&
|
|
||||||
(cfTtl == HConstants.FOREVER || oldest < cfTtl)) {
|
|
||||||
float blockLocalityIndex = sf.getHDFSBlockDistribution().getBlockLocalityIndex(
|
|
||||||
RSRpcServices.getHostname(comConf.conf, false)
|
|
||||||
);
|
|
||||||
if (blockLocalityIndex < comConf.getMinLocalityToForceCompact()) {
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("Major compaction triggered on only store " + this +
|
|
||||||
"; to make hdfs blocks local, current blockLocalityIndex is " +
|
|
||||||
blockLocalityIndex + " (min " + comConf.getMinLocalityToForceCompact() +
|
|
||||||
")");
|
|
||||||
}
|
|
||||||
result = true;
|
|
||||||
} else {
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("Skipping major compaction of " + this +
|
|
||||||
" because one (major) compacted file only, oldestTime " +
|
|
||||||
oldest + "ms is < ttl=" + cfTtl + " and blockLocalityIndex is " +
|
|
||||||
blockLocalityIndex + " (min " + comConf.getMinLocalityToForceCompact() +
|
|
||||||
")");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else if (cfTtl != HConstants.FOREVER && oldest > cfTtl) {
|
|
||||||
LOG.debug("Major compaction triggered on store " + this +
|
|
||||||
", because keyvalues outdated; time since last major compaction " +
|
|
||||||
(now - lowTimestamp) + "ms");
|
|
||||||
result = true;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("Major compaction triggered on store " + this +
|
|
||||||
"; time since last major compaction " + (now - lowTimestamp) + "ms");
|
|
||||||
}
|
|
||||||
result = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Used calculation jitter
|
* A heuristic method to decide whether to schedule a compaction request
|
||||||
|
* @param storeFiles files in the store.
|
||||||
|
* @param filesCompacting files being scheduled to compact.
|
||||||
|
* @return true to schedule a request.
|
||||||
*/
|
*/
|
||||||
private final Random random = new Random();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param filesToCompact
|
|
||||||
* @return When to run next major compaction
|
|
||||||
*/
|
|
||||||
public long getNextMajorCompactTime(final Collection<StoreFile> filesToCompact) {
|
|
||||||
// default = 24hrs
|
|
||||||
long ret = comConf.getMajorCompactionPeriod();
|
|
||||||
if (ret > 0) {
|
|
||||||
// default = 20% = +/- 4.8 hrs
|
|
||||||
double jitterPct = comConf.getMajorCompactionJitter();
|
|
||||||
if (jitterPct > 0) {
|
|
||||||
long jitter = Math.round(ret * jitterPct);
|
|
||||||
// deterministic jitter avoids a major compaction storm on restart
|
|
||||||
Integer seed = StoreUtils.getDeterministicRandomSeed(filesToCompact);
|
|
||||||
if (seed != null) {
|
|
||||||
// Synchronized to ensure one user of random instance at a time.
|
|
||||||
double rnd = -1;
|
|
||||||
synchronized (this) {
|
|
||||||
this.random.setSeed(seed);
|
|
||||||
rnd = this.random.nextDouble();
|
|
||||||
}
|
|
||||||
ret += jitter - Math.round(2L * jitter * rnd);
|
|
||||||
} else {
|
|
||||||
ret = 0; // If seed is null, then no storefiles == no major compaction
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param compactionSize Total size of some compaction
|
|
||||||
* @return whether this should be a large or small compaction
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public boolean throttleCompaction(long compactionSize) {
|
|
||||||
return compactionSize > comConf.getThrottlePoint();
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean needsCompaction(final Collection<StoreFile> storeFiles,
|
public boolean needsCompaction(final Collection<StoreFile> storeFiles,
|
||||||
final List<StoreFile> filesCompacting) {
|
final List<StoreFile> filesCompacting) {
|
||||||
int numCandidates = storeFiles.size() - filesCompacting.size();
|
int numCandidates = storeFiles.size() - filesCompacting.size();
|
||||||
|
@ -392,7 +204,6 @@ public class RatioBasedCompactionPolicy extends CompactionPolicy {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Overwrite min threshold for compaction
|
* Overwrite min threshold for compaction
|
||||||
* @param minThreshold min to update to
|
|
||||||
*/
|
*/
|
||||||
public void setMinThreshold(int minThreshold) {
|
public void setMinThreshold(int minThreshold) {
|
||||||
comConf.setMinFilesToCompact(minThreshold);
|
comConf.setMinFilesToCompact(minThreshold);
|
||||||
|
|
|
@ -0,0 +1,239 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
|
||||||
|
* agreements. See the NOTICE file distributed with this work for additional information regarding
|
||||||
|
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License. You may obtain a
|
||||||
|
* copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
|
||||||
|
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
|
||||||
|
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
|
||||||
|
* for the specific language governing permissions and limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.regionserver.compactions;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import com.google.common.base.Predicate;
|
||||||
|
import com.google.common.collect.Collections2;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Random;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.StoreUtils;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An abstract compaction policy that select files on seq id order.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public abstract class SortedCompactionPolicy extends CompactionPolicy {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory.getLog(SortedCompactionPolicy.class);
|
||||||
|
|
||||||
|
public SortedCompactionPolicy(Configuration conf, StoreConfigInformation storeConfigInfo) {
|
||||||
|
super(conf, storeConfigInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<StoreFile> preSelectCompactionForCoprocessor(final Collection<StoreFile> candidates,
|
||||||
|
final List<StoreFile> filesCompacting) {
|
||||||
|
return getCurrentEligibleFiles(new ArrayList<StoreFile>(candidates), filesCompacting);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param candidateFiles candidate files, ordered from oldest to newest by seqId. We rely on
|
||||||
|
* DefaultStoreFileManager to sort the files by seqId to guarantee contiguous compaction based
|
||||||
|
* on seqId for data consistency.
|
||||||
|
* @return subset copy of candidate list that meets compaction criteria
|
||||||
|
*/
|
||||||
|
public CompactionRequest selectCompaction(Collection<StoreFile> candidateFiles,
|
||||||
|
final List<StoreFile> filesCompacting, final boolean isUserCompaction,
|
||||||
|
final boolean mayUseOffPeak, final boolean forceMajor) throws IOException {
|
||||||
|
// Preliminary compaction subject to filters
|
||||||
|
ArrayList<StoreFile> candidateSelection = new ArrayList<StoreFile>(candidateFiles);
|
||||||
|
// Stuck and not compacting enough (estimate). It is not guaranteed that we will be
|
||||||
|
// able to compact more if stuck and compacting, because ratio policy excludes some
|
||||||
|
// non-compacting files from consideration during compaction (see getCurrentEligibleFiles).
|
||||||
|
int futureFiles = filesCompacting.isEmpty() ? 0 : 1;
|
||||||
|
boolean mayBeStuck = (candidateFiles.size() - filesCompacting.size() + futureFiles)
|
||||||
|
>= storeConfigInfo.getBlockingFileCount();
|
||||||
|
|
||||||
|
candidateSelection = getCurrentEligibleFiles(candidateSelection, filesCompacting);
|
||||||
|
LOG.debug("Selecting compaction from " + candidateFiles.size() + " store files, " +
|
||||||
|
filesCompacting.size() + " compacting, " + candidateSelection.size() +
|
||||||
|
" eligible, " + storeConfigInfo.getBlockingFileCount() + " blocking");
|
||||||
|
|
||||||
|
// If we can't have all files, we cannot do major anyway
|
||||||
|
boolean isAllFiles = candidateFiles.size() == candidateSelection.size();
|
||||||
|
if (!(forceMajor && isAllFiles)) {
|
||||||
|
candidateSelection = skipLargeFiles(candidateSelection, mayUseOffPeak);
|
||||||
|
isAllFiles = candidateFiles.size() == candidateSelection.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try a major compaction if this is a user-requested major compaction,
|
||||||
|
// or if we do not have too many files to compact and this was requested as a major compaction
|
||||||
|
boolean isTryingMajor = (forceMajor && isAllFiles && isUserCompaction)
|
||||||
|
|| (((forceMajor && isAllFiles) || shouldPerformMajorCompaction(candidateSelection))
|
||||||
|
&& (candidateSelection.size() < comConf.getMaxFilesToCompact()));
|
||||||
|
// Or, if there are any references among the candidates.
|
||||||
|
boolean isAfterSplit = StoreUtils.hasReferences(candidateSelection);
|
||||||
|
|
||||||
|
CompactionRequest result = createCompactionRequest(candidateSelection,
|
||||||
|
isTryingMajor || isAfterSplit, mayUseOffPeak, mayBeStuck);
|
||||||
|
|
||||||
|
ArrayList<StoreFile> filesToCompact = Lists.newArrayList(result.getFiles());
|
||||||
|
removeExcessFiles(filesToCompact, isUserCompaction, isTryingMajor);
|
||||||
|
result.updateFiles(filesToCompact);
|
||||||
|
|
||||||
|
isAllFiles = (candidateFiles.size() == filesToCompact.size());
|
||||||
|
result.setOffPeak(!filesToCompact.isEmpty() && !isAllFiles && mayUseOffPeak);
|
||||||
|
result.setIsMajor(isTryingMajor && isAllFiles, isAllFiles);
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected abstract CompactionRequest createCompactionRequest(ArrayList<StoreFile>
|
||||||
|
candidateSelection, boolean tryingMajor, boolean mayUseOffPeak, boolean mayBeStuck)
|
||||||
|
throws IOException;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* @param filesToCompact Files to compact. Can be null.
|
||||||
|
* @return True if we should run a major compaction.
|
||||||
|
*/
|
||||||
|
public abstract boolean shouldPerformMajorCompaction(final Collection<StoreFile> filesToCompact)
|
||||||
|
throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Used calculation jitter
|
||||||
|
*/
|
||||||
|
private final Random random = new Random();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param filesToCompact
|
||||||
|
* @return When to run next major compaction
|
||||||
|
*/
|
||||||
|
public long getNextMajorCompactTime(final Collection<StoreFile> filesToCompact) {
|
||||||
|
// default = 24hrs
|
||||||
|
long ret = comConf.getMajorCompactionPeriod();
|
||||||
|
if (ret > 0) {
|
||||||
|
// default = 20% = +/- 4.8 hrs
|
||||||
|
double jitterPct = comConf.getMajorCompactionJitter();
|
||||||
|
if (jitterPct > 0) {
|
||||||
|
long jitter = Math.round(ret * jitterPct);
|
||||||
|
// deterministic jitter avoids a major compaction storm on restart
|
||||||
|
Integer seed = StoreUtils.getDeterministicRandomSeed(filesToCompact);
|
||||||
|
if (seed != null) {
|
||||||
|
// Synchronized to ensure one user of random instance at a time.
|
||||||
|
double rnd = -1;
|
||||||
|
synchronized (this) {
|
||||||
|
this.random.setSeed(seed);
|
||||||
|
rnd = this.random.nextDouble();
|
||||||
|
}
|
||||||
|
ret += jitter - Math.round(2L * jitter * rnd);
|
||||||
|
} else {
|
||||||
|
ret = 0; // If seed is null, then no storefiles == no major compaction
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param compactionSize Total size of some compaction
|
||||||
|
* @return whether this should be a large or small compaction
|
||||||
|
*/
|
||||||
|
public boolean throttleCompaction(long compactionSize) {
|
||||||
|
return compactionSize > comConf.getThrottlePoint();
|
||||||
|
}
|
||||||
|
|
||||||
|
public abstract boolean needsCompaction(final Collection<StoreFile> storeFiles,
|
||||||
|
final List<StoreFile> filesCompacting);
|
||||||
|
|
||||||
|
protected ArrayList<StoreFile> getCurrentEligibleFiles(ArrayList<StoreFile> candidateFiles,
|
||||||
|
final List<StoreFile> filesCompacting) {
|
||||||
|
// candidates = all storefiles not already in compaction queue
|
||||||
|
if (!filesCompacting.isEmpty()) {
|
||||||
|
// exclude all files older than the newest file we're currently
|
||||||
|
// compacting. this allows us to preserve contiguity (HBASE-2856)
|
||||||
|
StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
|
||||||
|
int idx = candidateFiles.indexOf(last);
|
||||||
|
Preconditions.checkArgument(idx != -1);
|
||||||
|
candidateFiles.subList(0, idx + 1).clear();
|
||||||
|
}
|
||||||
|
return candidateFiles;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param candidates pre-filtrate
|
||||||
|
* @return filtered subset exclude all files above maxCompactSize
|
||||||
|
* Also save all references. We MUST compact them
|
||||||
|
*/
|
||||||
|
protected ArrayList<StoreFile> skipLargeFiles(ArrayList<StoreFile> candidates,
|
||||||
|
boolean mayUseOffpeak) {
|
||||||
|
int pos = 0;
|
||||||
|
while (pos < candidates.size() && !candidates.get(pos).isReference()
|
||||||
|
&& (candidates.get(pos).getReader().length() > comConf.getMaxCompactSize(mayUseOffpeak))) {
|
||||||
|
++pos;
|
||||||
|
}
|
||||||
|
if (pos > 0) {
|
||||||
|
LOG.debug("Some files are too large. Excluding " + pos
|
||||||
|
+ " files from compaction candidates");
|
||||||
|
candidates.subList(0, pos).clear();
|
||||||
|
}
|
||||||
|
return candidates;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param candidates pre-filtrate
|
||||||
|
* @return filtered subset exclude all bulk load files if configured
|
||||||
|
*/
|
||||||
|
protected ArrayList<StoreFile> filterBulk(ArrayList<StoreFile> candidates) {
|
||||||
|
candidates.removeAll(Collections2.filter(candidates, new Predicate<StoreFile>() {
|
||||||
|
@Override
|
||||||
|
public boolean apply(StoreFile input) {
|
||||||
|
return input.excludeFromMinorCompaction();
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
return candidates;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param candidates pre-filtrate
|
||||||
|
*/
|
||||||
|
protected void removeExcessFiles(ArrayList<StoreFile> candidates,
|
||||||
|
boolean isUserCompaction, boolean isMajorCompaction) {
|
||||||
|
int excess = candidates.size() - comConf.getMaxFilesToCompact();
|
||||||
|
if (excess > 0) {
|
||||||
|
if (isMajorCompaction && isUserCompaction) {
|
||||||
|
LOG.debug("Warning, compacting more than " + comConf.getMaxFilesToCompact()
|
||||||
|
+ " files because of a user-requested major compaction");
|
||||||
|
} else {
|
||||||
|
LOG.debug("Too many admissible files. Excluding " + excess
|
||||||
|
+ " files from compaction candidates");
|
||||||
|
candidates.subList(comConf.getMaxFilesToCompact(), candidates.size()).clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param candidates pre-filtrate
|
||||||
|
* @return filtered subset forget the compactionSelection if we don't have enough files
|
||||||
|
*/
|
||||||
|
protected ArrayList<StoreFile> checkMinFilesCriteria(ArrayList<StoreFile> candidates,
|
||||||
|
int minFiles) {
|
||||||
|
if (candidates.size() < minFiles) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Not compacting files because we only have " + candidates.size()
|
||||||
|
+ " files ready for compaction. Need " + minFiles + " to initiate.");
|
||||||
|
}
|
||||||
|
candidates.clear();
|
||||||
|
}
|
||||||
|
return candidates;
|
||||||
|
}
|
||||||
|
}
|
|
@ -166,7 +166,8 @@ public class StripeCompactionPolicy extends CompactionPolicy {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isMajorCompaction(Collection<StoreFile> filesToCompact) throws IOException {
|
public boolean shouldPerformMajorCompaction(Collection<StoreFile> filesToCompact)
|
||||||
|
throws IOException {
|
||||||
return false; // there's never a major compaction!
|
return false; // there's never a major compaction!
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -25,8 +25,11 @@ import java.util.TreeMap;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.KeyValue.KVComparator;
|
import org.apache.hadoop.hbase.KeyValue.KVComparator;
|
||||||
|
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
|
||||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
|
|
||||||
/** A mock used so our tests don't deal with actual StoreFiles */
|
/** A mock used so our tests don't deal with actual StoreFiles */
|
||||||
public class MockStoreFile extends StoreFile {
|
public class MockStoreFile extends StoreFile {
|
||||||
|
@ -38,6 +41,9 @@ public class MockStoreFile extends StoreFile {
|
||||||
byte[] splitPoint = null;
|
byte[] splitPoint = null;
|
||||||
TimeRangeTracker timeRangeTracker;
|
TimeRangeTracker timeRangeTracker;
|
||||||
long entryCount;
|
long entryCount;
|
||||||
|
boolean isMajor;
|
||||||
|
HDFSBlocksDistribution hdfsBlocksDistribution;
|
||||||
|
long modificationTime;
|
||||||
|
|
||||||
MockStoreFile(HBaseTestingUtility testUtil, Path testPath,
|
MockStoreFile(HBaseTestingUtility testUtil, Path testPath,
|
||||||
long length, long ageInDisk, boolean isRef, long sequenceid) throws IOException {
|
long length, long ageInDisk, boolean isRef, long sequenceid) throws IOException {
|
||||||
|
@ -47,6 +53,11 @@ public class MockStoreFile extends StoreFile {
|
||||||
this.isRef = isRef;
|
this.isRef = isRef;
|
||||||
this.ageInDisk = ageInDisk;
|
this.ageInDisk = ageInDisk;
|
||||||
this.sequenceid = sequenceid;
|
this.sequenceid = sequenceid;
|
||||||
|
this.isMajor = false;
|
||||||
|
hdfsBlocksDistribution = new HDFSBlocksDistribution();
|
||||||
|
hdfsBlocksDistribution.addHostsAndBlockWeight(
|
||||||
|
new String[] { RSRpcServices.getHostname(testUtil.getConfiguration(), false) }, 1);
|
||||||
|
modificationTime = EnvironmentEdgeManager.currentTime();
|
||||||
}
|
}
|
||||||
|
|
||||||
void setLength(long newLen) {
|
void setLength(long newLen) {
|
||||||
|
@ -65,7 +76,11 @@ public class MockStoreFile extends StoreFile {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isMajorCompaction() {
|
public boolean isMajorCompaction() {
|
||||||
return false;
|
return isMajor;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setIsMajor(boolean isMajor) {
|
||||||
|
this.isMajor = isMajor;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -74,12 +89,7 @@ public class MockStoreFile extends StoreFile {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
boolean isBulkLoadResult() {
|
public boolean isBulkLoadResult() {
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isCompactedAway() {
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -102,14 +112,22 @@ public class MockStoreFile extends StoreFile {
|
||||||
|
|
||||||
public Long getMinimumTimestamp() {
|
public Long getMinimumTimestamp() {
|
||||||
return (timeRangeTracker == null) ?
|
return (timeRangeTracker == null) ?
|
||||||
null :
|
null : timeRangeTracker.getMinimumTimestamp();
|
||||||
timeRangeTracker.getMinimumTimestamp();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public Long getMaximumTimestamp() {
|
public Long getMaximumTimestamp() {
|
||||||
return (timeRangeTracker == null) ?
|
return (timeRangeTracker == null) ?
|
||||||
null :
|
null : timeRangeTracker.getMaximumTimestamp();
|
||||||
timeRangeTracker.getMaximumTimestamp();
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getModificationTimeStamp() {
|
||||||
|
return modificationTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public HDFSBlocksDistribution getHDFSBlockDistribution() {
|
||||||
|
return hdfsBlocksDistribution;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -1,5 +1,4 @@
|
||||||
/**
|
/**
|
||||||
*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
* distributed with this work for additional information
|
* distributed with this work for additional information
|
||||||
|
@ -26,17 +25,25 @@ import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionPolicy;
|
import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionPolicy;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionRequest;
|
||||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
|
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
@Category(SmallTests.class)
|
@Category(SmallTests.class)
|
||||||
public class TestDateTieredCompaction extends TestCompactionPolicy {
|
public class TestDateTieredCompactionPolicy extends TestCompactionPolicy {
|
||||||
ArrayList<StoreFile> sfCreate(long[] minTimestamps, long[] maxTimestamps, long[] sizes)
|
ArrayList<StoreFile> sfCreate(long[] minTimestamps, long[] maxTimestamps, long[] sizes)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge();
|
||||||
|
EnvironmentEdgeManager.injectEdge(timeMachine);
|
||||||
|
// Has to be > 0 and < now.
|
||||||
|
timeMachine.setValue(1);
|
||||||
ArrayList<Long> ageInDisk = new ArrayList<Long>();
|
ArrayList<Long> ageInDisk = new ArrayList<Long>();
|
||||||
for (int i = 0; i < sizes.length; i++) {
|
for (int i = 0; i < sizes.length; i++) {
|
||||||
ageInDisk.add(0L);
|
ageInDisk.add(0L);
|
||||||
|
@ -57,29 +64,47 @@ public class TestDateTieredCompaction extends TestCompactionPolicy {
|
||||||
super.config();
|
super.config();
|
||||||
|
|
||||||
// Set up policy
|
// Set up policy
|
||||||
|
conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY,
|
||||||
|
"org.apache.hadoop.hbase.regionserver.DateTieredStoreEngine");
|
||||||
conf.setLong(CompactionConfiguration.MAX_AGE_MILLIS_KEY, 100);
|
conf.setLong(CompactionConfiguration.MAX_AGE_MILLIS_KEY, 100);
|
||||||
conf.setLong(CompactionConfiguration.INCOMING_WINDOW_MIN_KEY, 3);
|
conf.setLong(CompactionConfiguration.INCOMING_WINDOW_MIN_KEY, 3);
|
||||||
conf.setLong(CompactionConfiguration.BASE_WINDOW_MILLIS_KEY, 6);
|
conf.setLong(CompactionConfiguration.BASE_WINDOW_MILLIS_KEY, 6);
|
||||||
conf.setInt(CompactionConfiguration.WINDOWS_PER_TIER_KEY, 4);
|
conf.setInt(CompactionConfiguration.WINDOWS_PER_TIER_KEY, 4);
|
||||||
conf.set(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY,
|
conf.setBoolean(CompactionConfiguration.SINGLE_OUTPUT_FOR_MINOR_COMPACTION_KEY, false);
|
||||||
DateTieredCompactionPolicy.class.getName());
|
|
||||||
|
|
||||||
// Special settings for compaction policy per window
|
// Special settings for compaction policy per window
|
||||||
this.conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 2);
|
this.conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 2);
|
||||||
this.conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 12);
|
this.conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 12);
|
||||||
this.conf.setFloat(CompactionConfiguration.HBASE_HSTORE_COMPACTION_RATIO_KEY, 1.2F);
|
this.conf.setFloat(CompactionConfiguration.HBASE_HSTORE_COMPACTION_RATIO_KEY, 1.2F);
|
||||||
|
|
||||||
|
conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 20);
|
||||||
|
conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 10);
|
||||||
}
|
}
|
||||||
|
|
||||||
void compactEquals(long now, ArrayList<StoreFile> candidates, long... expected)
|
void compactEquals(long now, ArrayList<StoreFile> candidates, long[] expectedFileSizes,
|
||||||
throws IOException {
|
long[] expectedBoundaries, boolean isMajor, boolean toCompact) throws IOException {
|
||||||
Assert.assertTrue(((DateTieredCompactionPolicy) store.storeEngine.getCompactionPolicy())
|
ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge();
|
||||||
.needsCompaction(candidates, ImmutableList.<StoreFile> of(), now));
|
EnvironmentEdgeManager.injectEdge(timeMachine);
|
||||||
|
timeMachine.setValue(now);
|
||||||
List<StoreFile> actual =
|
DateTieredCompactionRequest request;
|
||||||
((DateTieredCompactionPolicy) store.storeEngine.getCompactionPolicy())
|
if (isMajor) {
|
||||||
.applyCompactionPolicy(candidates, false, false, now);
|
for (StoreFile file : candidates) {
|
||||||
|
((MockStoreFile)file).setIsMajor(true);
|
||||||
Assert.assertEquals(Arrays.toString(expected), Arrays.toString(getSizes(actual)));
|
}
|
||||||
|
Assert.assertEquals(toCompact, ((DateTieredCompactionPolicy) store.storeEngine.getCompactionPolicy())
|
||||||
|
.shouldPerformMajorCompaction(candidates));
|
||||||
|
request = (DateTieredCompactionRequest) ((DateTieredCompactionPolicy) store.storeEngine
|
||||||
|
.getCompactionPolicy()).selectMajorCompaction(candidates);
|
||||||
|
} else {
|
||||||
|
Assert.assertEquals(toCompact, ((DateTieredCompactionPolicy) store.storeEngine.getCompactionPolicy())
|
||||||
|
.needsCompaction(candidates, ImmutableList.<StoreFile> of()));
|
||||||
|
request = (DateTieredCompactionRequest) ((DateTieredCompactionPolicy) store.storeEngine
|
||||||
|
.getCompactionPolicy()).selectMinorCompaction(candidates, false, false);
|
||||||
|
}
|
||||||
|
List<StoreFile> actual = Lists.newArrayList(request.getFiles());
|
||||||
|
Assert.assertEquals(Arrays.toString(expectedFileSizes), Arrays.toString(getSizes(actual)));
|
||||||
|
Assert.assertEquals(Arrays.toString(expectedBoundaries),
|
||||||
|
Arrays.toString(request.getBoundaries().toArray()));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -92,7 +117,8 @@ public class TestDateTieredCompaction extends TestCompactionPolicy {
|
||||||
long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15 };
|
long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15 };
|
||||||
long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 23, 24, 25, 10, 11, 12, 13 };
|
long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 23, 24, 25, 10, 11, 12, 13 };
|
||||||
|
|
||||||
compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), 13, 12, 11, 10);
|
compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 10, 11, 12, 13 },
|
||||||
|
new long[] { Long.MIN_VALUE, 12 }, false, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -105,7 +131,22 @@ public class TestDateTieredCompaction extends TestCompactionPolicy {
|
||||||
long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13 };
|
long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13 };
|
||||||
long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 23, 24, 25, 10, 11 };
|
long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 23, 24, 25, 10, 11 };
|
||||||
|
|
||||||
compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), 25, 24, 23, 22, 21, 20);
|
compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 20, 21, 22, 23,
|
||||||
|
24, 25 }, new long[] { Long.MIN_VALUE, 6}, false, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test for file on the upper bound of incoming window
|
||||||
|
* @throws IOException with error
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void OnUpperBoundOfIncomingWindow() throws IOException {
|
||||||
|
long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
|
||||||
|
long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 18 };
|
||||||
|
long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 23, 24, 25, 10, 11, 12, 13 };
|
||||||
|
|
||||||
|
compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 10, 11, 12, 13 },
|
||||||
|
new long[] { Long.MIN_VALUE, 12 }, false, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -115,23 +156,25 @@ public class TestDateTieredCompaction extends TestCompactionPolicy {
|
||||||
@Test
|
@Test
|
||||||
public void NewerThanIncomingWindow() throws IOException {
|
public void NewerThanIncomingWindow() throws IOException {
|
||||||
long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
|
long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
|
||||||
long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 18 };
|
long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 19 };
|
||||||
long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 23, 24, 25, 10, 11, 12, 13 };
|
long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 23, 24, 25, 10, 11, 12, 13 };
|
||||||
|
|
||||||
compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), 13, 12, 11, 10);
|
compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 10, 11, 12, 13 },
|
||||||
|
new long[] { Long.MIN_VALUE, 12}, false, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* If there is no T1 window, we don't build 2
|
* If there is no T1 window, we don't build T2
|
||||||
* @throws IOException with error
|
* @throws IOException with error
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void NoT2() throws IOException {
|
public void NoT2() throws IOException {
|
||||||
long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
|
long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
|
||||||
long[] maxTimestamps = new long[] { 44, 60, 61, 92, 95, 100 };
|
long[] maxTimestamps = new long[] { 44, 60, 61, 97, 100, 193 };
|
||||||
long[] sizes = new long[] { 0, 20, 21, 22, 23, 1 };
|
long[] sizes = new long[] { 0, 20, 21, 22, 23, 1 };
|
||||||
|
|
||||||
compactEquals(100, sfCreate(minTimestamps, maxTimestamps, sizes), 23, 22);
|
compactEquals(194, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 22, 23 },
|
||||||
|
new long[] { Long.MIN_VALUE, 96}, false, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -140,7 +183,8 @@ public class TestDateTieredCompaction extends TestCompactionPolicy {
|
||||||
long[] maxTimestamps = new long[] { 44, 60, 61, 96, 100, 104, 120, 124, 143, 145, 157 };
|
long[] maxTimestamps = new long[] { 44, 60, 61, 96, 100, 104, 120, 124, 143, 145, 157 };
|
||||||
long[] sizes = new long[] { 0, 50, 51, 40, 41, 42, 30, 31, 32, 2, 1 };
|
long[] sizes = new long[] { 0, 50, 51, 40, 41, 42, 30, 31, 32, 2, 1 };
|
||||||
|
|
||||||
compactEquals(161, sfCreate(minTimestamps, maxTimestamps, sizes), 32, 31, 30);
|
compactEquals(161, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 30, 31, 32 },
|
||||||
|
new long[] { Long.MIN_VALUE, 120 }, false, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -153,7 +197,8 @@ public class TestDateTieredCompaction extends TestCompactionPolicy {
|
||||||
long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 };
|
long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 };
|
||||||
long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 280, 23, 24, 1 };
|
long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 280, 23, 24, 1 };
|
||||||
|
|
||||||
compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), 22, 21, 20);
|
compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 20, 21, 22 },
|
||||||
|
new long[] { Long.MIN_VALUE }, false, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -166,7 +211,8 @@ public class TestDateTieredCompaction extends TestCompactionPolicy {
|
||||||
long[] maxTimestamps = new long[] { 44, 60, 61, 96, 100, 104, 120, 124, 143, 145, 157 };
|
long[] maxTimestamps = new long[] { 44, 60, 61, 96, 100, 104, 120, 124, 143, 145, 157 };
|
||||||
long[] sizes = new long[] { 0, 50, 51, 40, 41, 42, 350, 30, 31, 2, 1 };
|
long[] sizes = new long[] { 0, 50, 51, 40, 41, 42, 350, 30, 31, 2, 1 };
|
||||||
|
|
||||||
compactEquals(161, sfCreate(minTimestamps, maxTimestamps, sizes), 31, 30);
|
compactEquals(161, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 30, 31 },
|
||||||
|
new long[] { Long.MIN_VALUE }, false, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -179,7 +225,8 @@ public class TestDateTieredCompaction extends TestCompactionPolicy {
|
||||||
long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 8, 9, 10, 11, 12 };
|
long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 8, 9, 10, 11, 12 };
|
||||||
long[] sizes = new long[] { 30, 31, 32, 33, 34, 22, 280, 23, 24, 1 };
|
long[] sizes = new long[] { 30, 31, 32, 33, 34, 22, 280, 23, 24, 1 };
|
||||||
|
|
||||||
compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), 24, 23);
|
compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 23, 24 },
|
||||||
|
new long[] { Long.MIN_VALUE }, false, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -192,7 +239,8 @@ public class TestDateTieredCompaction extends TestCompactionPolicy {
|
||||||
long[] maxTimestamps = new long[] { 44, 60, 61, 96, 100, 104, 105, 106, 113, 145, 157 };
|
long[] maxTimestamps = new long[] { 44, 60, 61, 96, 100, 104, 105, 106, 113, 145, 157 };
|
||||||
long[] sizes = new long[] { 0, 50, 51, 40, 41, 42, 33, 30, 31, 2, 1 };
|
long[] sizes = new long[] { 0, 50, 51, 40, 41, 42, 33, 30, 31, 2, 1 };
|
||||||
|
|
||||||
compactEquals(161, sfCreate(minTimestamps, maxTimestamps, sizes), 31, 30, 33, 42, 41, 40);
|
compactEquals(161, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 40, 41, 42, 33,
|
||||||
|
30, 31 }, new long[] { Long.MIN_VALUE, 96 }, false, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -200,12 +248,78 @@ public class TestDateTieredCompaction extends TestCompactionPolicy {
|
||||||
* @throws IOException with error
|
* @throws IOException with error
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void OutOfOrder() throws IOException {
|
public void outOfOrder() throws IOException {
|
||||||
long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
|
long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
|
||||||
long[] maxTimestamps = new long[] { 0, 13, 3, 10, 11, 1, 2, 12, 14, 15 };
|
long[] maxTimestamps = new long[] { 0, 13, 3, 10, 11, 1, 2, 12, 14, 15 };
|
||||||
long[] sizes = new long[] { 30, 31, 32, 33, 34, 22, 28, 23, 24, 1 };
|
long[] sizes = new long[] { 30, 31, 32, 33, 34, 22, 28, 23, 24, 1 };
|
||||||
|
|
||||||
compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), 1, 24, 23, 28, 22, 34, 33, 32,
|
compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 31, 32, 33, 34,
|
||||||
31);
|
22, 28, 23, 24, 1 }, new long[] { Long.MIN_VALUE, 12 }, false, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Negative epoch time
|
||||||
|
* @throws IOException with error
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void negativeEpochtime() throws IOException {
|
||||||
|
long[] minTimestamps =
|
||||||
|
new long[] { -1000, -1000, -1000, -1000, -1000, -1000, -1000, -1000, -1000, -1000 };
|
||||||
|
long[] maxTimestamps = new long[] { -28, -11, -10, -9, -8, -7, -6, -5, -4, -3 };
|
||||||
|
long[] sizes = new long[] { 30, 31, 32, 33, 34, 22, 25, 23, 24, 1 };
|
||||||
|
|
||||||
|
compactEquals(1, sfCreate(minTimestamps, maxTimestamps, sizes),
|
||||||
|
new long[] { 31, 32, 33, 34, 22, 25, 23, 24, 1 },
|
||||||
|
new long[] { Long.MIN_VALUE, -24 }, false, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Major compaction
|
||||||
|
* @throws IOException with error
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void majorCompation() throws IOException {
|
||||||
|
long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
|
||||||
|
long[] maxTimestamps = new long[] { 44, 60, 61, 96, 100, 104, 105, 106, 113, 145, 157 };
|
||||||
|
long[] sizes = new long[] { 0, 50, 51, 40, 41, 42, 33, 30, 31, 2, 1 };
|
||||||
|
|
||||||
|
compactEquals(161, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 0, 50, 51, 40,41, 42,
|
||||||
|
33, 30, 31, 2, 1 }, new long[] { Long.MIN_VALUE, 24, 48, 72, 96, 120, 144, 150, 156 }, true, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Major compaction with negative numbers
|
||||||
|
* @throws IOException with error
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void negativeForMajor() throws IOException {
|
||||||
|
long[] minTimestamps =
|
||||||
|
new long[] { -155, -100, -100, -100, -100, -100, -100, -100, -100, -100, -100 };
|
||||||
|
long[] maxTimestamps = new long[] { -8, -7, -6, -5, -4, -3, -2, -1, 0, 6, 13 };
|
||||||
|
long[] sizes = new long[] { 0, 50, 51, 40, 41, 42, 33, 30, 31, 2, 1 };
|
||||||
|
|
||||||
|
compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 0, 50, 51, 40,
|
||||||
|
41, 42, 33, 30, 31, 2, 1 },
|
||||||
|
new long[] { Long.MIN_VALUE, -144, -120, -96, -72, -48, -24, 0, 6, 12 }, true, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Major compaction with maximum values
|
||||||
|
* @throws IOException with error
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void maxValuesForMajor() throws IOException {
|
||||||
|
conf.setLong(CompactionConfiguration.BASE_WINDOW_MILLIS_KEY, Long.MAX_VALUE / 2);
|
||||||
|
conf.setInt(CompactionConfiguration.WINDOWS_PER_TIER_KEY, 2);
|
||||||
|
store.storeEngine.getCompactionPolicy().setConf(conf);
|
||||||
|
long[] minTimestamps =
|
||||||
|
new long[] { Long.MIN_VALUE, -100 };
|
||||||
|
long[] maxTimestamps = new long[] { -8, Long.MAX_VALUE };
|
||||||
|
long[] sizes = new long[] { 0, 1 };
|
||||||
|
|
||||||
|
compactEquals(Long.MAX_VALUE, sfCreate(minTimestamps, maxTimestamps, sizes),
|
||||||
|
new long[] { 0, 1 },
|
||||||
|
new long[] { Long.MIN_VALUE, -4611686018427387903L, 0, 4611686018427387903L,
|
||||||
|
9223372036854775806L }, true, true);
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -42,7 +42,7 @@ public class EverythingPolicy extends RatioBasedCompactionPolicy {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
final ArrayList<StoreFile> applyCompactionPolicy(final ArrayList<StoreFile> candidates,
|
protected final ArrayList<StoreFile> applyCompactionPolicy(final ArrayList<StoreFile> candidates,
|
||||||
final boolean mayUseOffPeak, final boolean mayBeStuck) throws IOException {
|
final boolean mayUseOffPeak, final boolean mayBeStuck) throws IOException {
|
||||||
|
|
||||||
if (candidates.size() < comConf.getMinFilesToCompact()) {
|
if (candidates.size() < comConf.getMinFilesToCompact()) {
|
||||||
|
|
Loading…
Reference in New Issue