HBASE-7964 requestCompaction priority argument is not used (except for user compaction check)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1452444 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ca83cf8264
commit
80273bd8e7
|
@ -46,7 +46,7 @@ public class DefaultStoreEngine extends StoreEngine<
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void createComponents() {
|
protected void createComponents() {
|
||||||
storeFileManager = new DefaultStoreFileManager(this.comparator);
|
storeFileManager = new DefaultStoreFileManager(this.comparator, this.conf);
|
||||||
|
|
||||||
// TODO: compactor and policy may be separately pluggable, but must derive from default ones.
|
// TODO: compactor and policy may be separately pluggable, but must derive from default ones.
|
||||||
compactor = new DefaultCompactor(this.conf, this.store);
|
compactor = new DefaultCompactor(this.conf, this.store);
|
||||||
|
|
|
@ -28,6 +28,7 @@ import java.util.List;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.KeyValue.KVComparator;
|
import org.apache.hadoop.hbase.KeyValue.KVComparator;
|
||||||
|
|
||||||
|
@ -43,6 +44,7 @@ class DefaultStoreFileManager implements StoreFileManager {
|
||||||
static final Log LOG = LogFactory.getLog(DefaultStoreFileManager.class);
|
static final Log LOG = LogFactory.getLog(DefaultStoreFileManager.class);
|
||||||
|
|
||||||
private final KVComparator kvComparator;
|
private final KVComparator kvComparator;
|
||||||
|
private final Configuration conf;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* List of store files inside this store. This is an immutable list that
|
* List of store files inside this store. This is an immutable list that
|
||||||
|
@ -50,8 +52,9 @@ class DefaultStoreFileManager implements StoreFileManager {
|
||||||
*/
|
*/
|
||||||
private volatile ImmutableList<StoreFile> storefiles = null;
|
private volatile ImmutableList<StoreFile> storefiles = null;
|
||||||
|
|
||||||
public DefaultStoreFileManager(KVComparator kvComparator) {
|
public DefaultStoreFileManager(KVComparator kvComparator, Configuration conf) {
|
||||||
this.kvComparator = kvComparator;
|
this.kvComparator = kvComparator;
|
||||||
|
this.conf = conf;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -124,9 +127,17 @@ class DefaultStoreFileManager implements StoreFileManager {
|
||||||
return getStorefiles();
|
return getStorefiles();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getStoreCompactionPriority() {
|
||||||
|
int blockingFileCount = conf.getInt(
|
||||||
|
"hbase.hstore.blockingStoreFiles", HStore.DEFAULT_BLOCKING_STOREFILE_COUNT);
|
||||||
|
return blockingFileCount - storefiles.size();
|
||||||
|
}
|
||||||
|
|
||||||
private void sortAndSetStoreFiles(List<StoreFile> storeFiles) {
|
private void sortAndSetStoreFiles(List<StoreFile> storeFiles) {
|
||||||
Collections.sort(storeFiles, StoreFile.Comparators.SEQ_ID);
|
Collections.sort(storeFiles, StoreFile.Comparators.SEQ_ID);
|
||||||
storefiles = ImmutableList.copyOf(storeFiles);
|
storefiles = ImmutableList.copyOf(storeFiles);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1324,7 +1324,9 @@ public class HStore implements Store {
|
||||||
this.forceMajor = this.forceMajor && !isMajor;
|
this.forceMajor = this.forceMajor && !isMajor;
|
||||||
|
|
||||||
// Set common request properties.
|
// Set common request properties.
|
||||||
compaction.getRequest().setPriority(getCompactPriority(priority));
|
// Set priority, either override value supplied by caller or from store.
|
||||||
|
compaction.getRequest().setPriority(
|
||||||
|
(priority != Store.NO_PRIORITY) ? priority : getCompactPriority());
|
||||||
compaction.getRequest().setIsMajor(isMajor);
|
compaction.getRequest().setIsMajor(isMajor);
|
||||||
compaction.getRequest().setDescription(
|
compaction.getRequest().setDescription(
|
||||||
region.getRegionNameAsString(), getColumnFamilyName());
|
region.getRegionNameAsString(), getColumnFamilyName());
|
||||||
|
@ -1755,18 +1757,9 @@ public class HStore implements Store {
|
||||||
return this.memstore.heapSize();
|
return this.memstore.heapSize();
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getCompactPriority() {
|
|
||||||
return getCompactPriority(Store.NO_PRIORITY);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getCompactPriority(int priority) {
|
public int getCompactPriority() {
|
||||||
// If this is a user-requested compaction, leave this at the user priority
|
return this.storeFileManager.getStoreCompactionPriority();
|
||||||
if (priority != Store.PRIORITY_USER) {
|
|
||||||
priority = this.compactionPolicy.getSystemCompactionPriority(
|
|
||||||
this.storeFileManager.getStorefiles());
|
|
||||||
}
|
|
||||||
return priority;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -182,13 +182,6 @@ public interface Store extends HeapSize, StoreConfigInformation {
|
||||||
|
|
||||||
public int getCompactPriority();
|
public int getCompactPriority();
|
||||||
|
|
||||||
/**
|
|
||||||
* @param priority priority to check against. When priority is {@link Store#PRIORITY_USER},
|
|
||||||
* {@link Store#PRIORITY_USER} is returned.
|
|
||||||
* @return The priority that this store has in the compaction queue.
|
|
||||||
*/
|
|
||||||
public int getCompactPriority(int priority);
|
|
||||||
|
|
||||||
public StoreFlusher getStoreFlusher(long cacheFlushId);
|
public StoreFlusher getStoreFlusher(long cacheFlushId);
|
||||||
|
|
||||||
// Split oriented methods
|
// Split oriented methods
|
||||||
|
|
|
@ -119,4 +119,9 @@ public interface StoreFileManager {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public abstract byte[] getSplitPoint() throws IOException;
|
public abstract byte[] getSplitPoint() throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return The store compaction priority.
|
||||||
|
*/
|
||||||
|
public abstract int getStoreCompactionPriority();
|
||||||
}
|
}
|
||||||
|
|
|
@ -62,7 +62,6 @@ public class CompactionConfiguration {
|
||||||
boolean shouldDeleteExpired;
|
boolean shouldDeleteExpired;
|
||||||
long majorCompactionPeriod;
|
long majorCompactionPeriod;
|
||||||
float majorCompactionJitter;
|
float majorCompactionJitter;
|
||||||
int blockingStoreFileCount;
|
|
||||||
|
|
||||||
CompactionConfiguration(Configuration conf, StoreConfigInformation storeConfigInfo) {
|
CompactionConfiguration(Configuration conf, StoreConfigInformation storeConfigInfo) {
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
|
@ -82,8 +81,6 @@ public class CompactionConfiguration {
|
||||||
shouldDeleteExpired = conf.getBoolean("hbase.store.delete.expired.storefile", true);
|
shouldDeleteExpired = conf.getBoolean("hbase.store.delete.expired.storefile", true);
|
||||||
majorCompactionPeriod = conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24);
|
majorCompactionPeriod = conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24);
|
||||||
majorCompactionJitter = conf.getFloat("hbase.hregion.majorcompaction.jitter", 0.20F);
|
majorCompactionJitter = conf.getFloat("hbase.hregion.majorcompaction.jitter", 0.20F);
|
||||||
blockingStoreFileCount =
|
|
||||||
conf.getInt("hbase.hstore.blockingStoreFiles", HStore.DEFAULT_BLOCKING_STOREFILE_COUNT);
|
|
||||||
|
|
||||||
LOG.info("Compaction configuration " + this.toString());
|
LOG.info("Compaction configuration " + this.toString());
|
||||||
}
|
}
|
||||||
|
@ -105,13 +102,6 @@ public class CompactionConfiguration {
|
||||||
majorCompactionJitter);
|
majorCompactionJitter);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* @return store file count that will cause the memstore of this store to be blocked.
|
|
||||||
*/
|
|
||||||
int getBlockingStorefileCount() {
|
|
||||||
return this.blockingStoreFileCount;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return lower bound below which compaction is selected without ratio test
|
* @return lower bound below which compaction is selected without ratio test
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -42,16 +42,6 @@ public abstract class CompactionPolicy {
|
||||||
this.comConf = new CompactionConfiguration(conf, this.storeConfigInfo);
|
this.comConf = new CompactionConfiguration(conf, this.storeConfigInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* @param storeFiles Store files in the store.
|
|
||||||
* @return The system compaction priority of the store, based on storeFiles.
|
|
||||||
* The priority range is as such - the smaller values are higher priority;
|
|
||||||
* 1 is user priority; only very important, blocking compactions should use
|
|
||||||
* values lower than that. With default settings, depending on the number of
|
|
||||||
* store files, the non-blocking priority will be in 2-6 range.
|
|
||||||
*/
|
|
||||||
public abstract int getSystemCompactionPriority(final Collection<StoreFile> storeFiles);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param filesToCompact Files to compact. Can be null.
|
* @param filesToCompact Files to compact. Can be null.
|
||||||
* @return True if we should run a major compaction.
|
* @return True if we should run a major compaction.
|
||||||
|
|
|
@ -57,7 +57,6 @@ public class DefaultCompactionPolicy extends CompactionPolicy {
|
||||||
super(conf, storeConfigInfo);
|
super(conf, storeConfigInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private ArrayList<StoreFile> getCurrentEligibleFiles(
|
private ArrayList<StoreFile> getCurrentEligibleFiles(
|
||||||
ArrayList<StoreFile> candidateFiles, final List<StoreFile> filesCompacting) {
|
ArrayList<StoreFile> candidateFiles, final List<StoreFile> filesCompacting) {
|
||||||
// candidates = all storefiles not already in compaction queue
|
// candidates = all storefiles not already in compaction queue
|
||||||
|
@ -77,11 +76,6 @@ public class DefaultCompactionPolicy extends CompactionPolicy {
|
||||||
return getCurrentEligibleFiles(new ArrayList<StoreFile>(candidates), filesCompacting);
|
return getCurrentEligibleFiles(new ArrayList<StoreFile>(candidates), filesCompacting);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public int getSystemCompactionPriority(final Collection<StoreFile> storeFiles) {
|
|
||||||
return this.comConf.getBlockingStorefileCount() - storeFiles.size();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param candidateFiles candidate files, ordered from oldest to newest
|
* @param candidateFiles candidate files, ordered from oldest to newest
|
||||||
* @return subset copy of candidate list that meets compaction criteria
|
* @return subset copy of candidate list that meets compaction criteria
|
||||||
|
|
Loading…
Reference in New Issue