HBASE-7784 move the code related to compaction selection specific to default compaction policy, into default compaction policy (from HStore) (Sergey)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1444298 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
df04846da5
commit
e3b688719c
|
@ -108,6 +108,8 @@ import com.google.common.collect.Lists;
|
|||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class HStore implements Store {
|
||||
public static final int DEFAULT_BLOCKING_STOREFILE_COUNT = 7;
|
||||
|
||||
static final Log LOG = LogFactory.getLog(HStore.class);
|
||||
|
||||
protected final MemStore memstore;
|
||||
|
@ -125,7 +127,6 @@ public class HStore implements Store {
|
|||
volatile boolean forceMajor = false;
|
||||
/* how many bytes to write between status checks */
|
||||
static int closeCheckInterval = 0;
|
||||
private final int blockingStoreFileCount;
|
||||
private volatile long storeSize = 0L;
|
||||
private volatile long totalUncompressedBytes = 0L;
|
||||
private final Object flushLock = new Object();
|
||||
|
@ -209,8 +210,7 @@ public class HStore implements Store {
|
|||
|
||||
// Setting up cache configuration for this family
|
||||
this.cacheConf = new CacheConfig(conf, family);
|
||||
this.blockingStoreFileCount =
|
||||
conf.getInt("hbase.hstore.blockingStoreFiles", 7);
|
||||
|
||||
|
||||
this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false);
|
||||
|
||||
|
@ -1230,22 +1230,13 @@ public class HStore implements Store {
|
|||
CompactionRequest ret = null;
|
||||
this.lock.readLock().lock();
|
||||
try {
|
||||
List<StoreFile> candidates = Lists.newArrayList(storeFileManager.getStorefiles());
|
||||
synchronized (filesCompacting) {
|
||||
// candidates = all StoreFiles not already in compaction queue
|
||||
List<StoreFile> candidates = Lists.newArrayList(storeFileManager.getStorefiles());
|
||||
if (!filesCompacting.isEmpty()) {
|
||||
// exclude all files older than the newest file we're currently
|
||||
// compacting. this allows us to preserve contiguity (HBASE-2856)
|
||||
StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
|
||||
int idx = candidates.indexOf(last);
|
||||
Preconditions.checkArgument(idx != -1);
|
||||
candidates.subList(0, idx + 1).clear();
|
||||
}
|
||||
|
||||
// First we need to pre-select compaction, and then pre-compact selection!
|
||||
candidates = compactionPolicy.preSelectCompaction(candidates, filesCompacting);
|
||||
boolean override = false;
|
||||
if (region.getCoprocessorHost() != null) {
|
||||
override = region.getCoprocessorHost().preCompactSelection(
|
||||
this, candidates);
|
||||
override = region.getCoprocessorHost().preCompactSelection(this, candidates);
|
||||
}
|
||||
CompactSelection filesToCompact;
|
||||
if (override) {
|
||||
|
@ -1739,12 +1730,12 @@ public class HStore implements Store {
|
|||
|
||||
@Override
|
||||
public int getCompactPriority(int priority) {
|
||||
// If this is a user-requested compaction, leave this at the highest priority
|
||||
if(priority == Store.PRIORITY_USER) {
|
||||
return Store.PRIORITY_USER;
|
||||
} else {
|
||||
return this.blockingStoreFileCount - this.storeFileManager.getStorefileCount();
|
||||
// If this is a user-requested compaction, leave this at the user priority
|
||||
if (priority != Store.PRIORITY_USER) {
|
||||
priority = this.compactionPolicy.getSystemCompactionPriority(
|
||||
this.storeFileManager.getStorefiles());
|
||||
}
|
||||
return priority;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1855,8 +1846,7 @@ public class HStore implements Store {
|
|||
|
||||
@Override
|
||||
public boolean needsCompaction() {
|
||||
return compactionPolicy.needsCompaction(
|
||||
this.storeFileManager.getStorefileCount() - filesCompacting.size());
|
||||
return compactionPolicy.needsCompaction(this.storeFileManager.getStorefiles(), filesCompacting);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1866,7 +1856,7 @@ public class HStore implements Store {
|
|||
|
||||
public static final long FIXED_OVERHEAD =
|
||||
ClassSize.align((20 * ClassSize.REFERENCE) + (4 * Bytes.SIZEOF_LONG)
|
||||
+ (3 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
|
||||
+ (2 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
|
||||
|
||||
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
|
||||
+ ClassSize.OBJECT + ClassSize.REENTRANT_LOCK
|
||||
|
|
|
@ -85,7 +85,7 @@ class MemStoreFlusher implements FlushRequester {
|
|||
"hbase.regionserver.global.memstore.upperLimit";
|
||||
private static final String LOWER_KEY =
|
||||
"hbase.regionserver.global.memstore.lowerLimit";
|
||||
private long blockingStoreFilesNumber;
|
||||
private int blockingStoreFileCount;
|
||||
private long blockingWaitTime;
|
||||
private final Counter updatesBlockedMsHighWater = new Counter();
|
||||
|
||||
|
@ -112,8 +112,8 @@ class MemStoreFlusher implements FlushRequester {
|
|||
"because supplied " + LOWER_KEY + " was > " + UPPER_KEY);
|
||||
}
|
||||
this.globalMemStoreLimitLowMark = lower;
|
||||
this.blockingStoreFilesNumber =
|
||||
conf.getInt("hbase.hstore.blockingStoreFiles", 7);
|
||||
this.blockingStoreFileCount =
|
||||
conf.getInt("hbase.hstore.blockingStoreFiles", HStore.DEFAULT_BLOCKING_STOREFILE_COUNT);
|
||||
this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime",
|
||||
90000);
|
||||
this.handlerCount = conf.getInt("hbase.hstore.flusher.count", 1);
|
||||
|
@ -482,7 +482,7 @@ class MemStoreFlusher implements FlushRequester {
|
|||
|
||||
private boolean isTooManyStoreFiles(HRegion region) {
|
||||
for (Store hstore : region.stores.values()) {
|
||||
if (hstore.getStorefilesCount() > this.blockingStoreFilesNumber) {
|
||||
if (hstore.getStorefilesCount() > this.blockingStoreFileCount) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
|
||||
|
||||
/**
|
||||
|
@ -63,6 +64,7 @@ public class CompactionConfiguration {
|
|||
boolean shouldDeleteExpired;
|
||||
long majorCompactionPeriod;
|
||||
float majorCompactionJitter;
|
||||
int blockingStoreFileCount;
|
||||
|
||||
CompactionConfiguration(Configuration conf, StoreConfigInformation storeConfigInfo) {
|
||||
this.conf = conf;
|
||||
|
@ -93,6 +95,8 @@ public class CompactionConfiguration {
|
|||
shouldDeleteExpired = conf.getBoolean("hbase.store.delete.expired.storefile", true);
|
||||
majorCompactionPeriod = conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24);
|
||||
majorCompactionJitter = conf.getFloat("hbase.hregion.majorcompaction.jitter", 0.20F);
|
||||
blockingStoreFileCount =
|
||||
conf.getInt("hbase.hstore.blockingStoreFiles", HStore.DEFAULT_BLOCKING_STOREFILE_COUNT);
|
||||
|
||||
LOG.info("Compaction configuration " + this.toString());
|
||||
}
|
||||
|
@ -116,6 +120,13 @@ public class CompactionConfiguration {
|
|||
majorCompactionJitter);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return store file count that will cause the memstore of this store to be blocked.
|
||||
*/
|
||||
int getBlockingStorefileCount() {
|
||||
return this.blockingStoreFileCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return lower bound below which compaction is selected without ratio test
|
||||
*/
|
||||
|
@ -205,4 +216,4 @@ public class CompactionConfiguration {
|
|||
private static boolean isValidHour(int hour) {
|
||||
return (hour >= 0 && hour <= 23);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -53,6 +53,16 @@ public abstract class CompactionPolicy extends Configured {
|
|||
Compactor compactor;
|
||||
HStore store;
|
||||
|
||||
/**
|
||||
* This is called before coprocessor preCompactSelection and should filter the candidates
|
||||
* for coprocessor; i.e. exclude the files that definitely cannot be compacted at this time.
|
||||
* @param candidateFiles candidate files, ordered from oldest to newest
|
||||
* @param filesCompacting files currently compacting
|
||||
* @return the list of files that can theoretically be compacted.
|
||||
*/
|
||||
public abstract List<StoreFile> preSelectCompaction(
|
||||
List<StoreFile> candidateFiles, final List<StoreFile> filesCompacting);
|
||||
|
||||
/**
|
||||
* @param candidateFiles candidate files, ordered from oldest to newest
|
||||
* @return subset copy of candidate list that meets compaction criteria
|
||||
|
@ -62,6 +72,16 @@ public abstract class CompactionPolicy extends Configured {
|
|||
final List<StoreFile> candidateFiles, final boolean isUserCompaction,
|
||||
final boolean forceMajor) throws IOException;
|
||||
|
||||
/**
|
||||
* @param storeFiles Store files in the store.
|
||||
* @return The system compaction priority of the store, based on storeFiles.
|
||||
* The priority range is as such - the smaller values are higher priority;
|
||||
* 1 is user priority; only very important, blocking compactions should use
|
||||
* values lower than that. With default settings, depending on the number of
|
||||
* store files, the non-blocking priority will be in 2-6 range.
|
||||
*/
|
||||
public abstract int getSystemCompactionPriority(final Collection<StoreFile> storeFiles);
|
||||
|
||||
/**
|
||||
* @param filesToCompact Files to compact. Can be null.
|
||||
* @return True if we should run a major compaction.
|
||||
|
@ -76,10 +96,12 @@ public abstract class CompactionPolicy extends Configured {
|
|||
public abstract boolean throttleCompaction(long compactionSize);
|
||||
|
||||
/**
|
||||
* @param numCandidates Number of candidate store files
|
||||
* @param storeFiles Current store files.
|
||||
* @param filesCompacting files currently compacting.
|
||||
* @return whether a compactionSelection is possible
|
||||
*/
|
||||
public abstract boolean needsCompaction(int numCandidates);
|
||||
public abstract boolean needsCompaction(final Collection<StoreFile> storeFiles,
|
||||
final List<StoreFile> filesCompacting);
|
||||
|
||||
/**
|
||||
* Inform the policy that some configuration has been change,
|
||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFile;
|
|||
import org.apache.hadoop.hbase.regionserver.StoreUtils;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.collect.Collections2;
|
||||
|
||||
|
@ -53,6 +54,26 @@ public class DefaultCompactionPolicy extends CompactionPolicy {
|
|||
compactor = new DefaultCompactor(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<StoreFile> preSelectCompaction(
|
||||
List<StoreFile> candidateFiles, final List<StoreFile> filesCompacting) {
|
||||
// candidates = all storefiles not already in compaction queue
|
||||
if (!filesCompacting.isEmpty()) {
|
||||
// exclude all files older than the newest file we're currently
|
||||
// compacting. this allows us to preserve contiguity (HBASE-2856)
|
||||
StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
|
||||
int idx = candidateFiles.indexOf(last);
|
||||
Preconditions.checkArgument(idx != -1);
|
||||
candidateFiles.subList(0, idx + 1).clear();
|
||||
}
|
||||
return candidateFiles;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getSystemCompactionPriority(final Collection<StoreFile> storeFiles) {
|
||||
return this.comConf.getBlockingStorefileCount() - storeFiles.size();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param candidateFiles candidate files, ordered from oldest to newest
|
||||
* @return subset copy of candidate list that meets compaction criteria
|
||||
|
@ -367,11 +388,10 @@ public class DefaultCompactionPolicy extends CompactionPolicy {
|
|||
return compactionSize > comConf.getThrottlePoint();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param numCandidates Number of candidate store files
|
||||
* @return whether a compactionSelection is possible
|
||||
*/
|
||||
public boolean needsCompaction(int numCandidates) {
|
||||
@Override
|
||||
public boolean needsCompaction(final Collection<StoreFile> storeFiles,
|
||||
final List<StoreFile> filesCompacting) {
|
||||
int numCandidates = storeFiles.size() - filesCompacting.size();
|
||||
return numCandidates > comConf.getMinFilesToCompact();
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue