HBASE-15665 Support using different StoreFileComparators for different CompactionPolicies
This commit is contained in:
parent
6d7a7fa3aa
commit
bd3b9753a9
|
@ -57,8 +57,9 @@ public class DateTieredStoreEngine extends StoreEngine<DefaultStoreFlusher,
|
|||
protected void createComponents(Configuration conf, Store store, CellComparator kvComparator)
|
||||
throws IOException {
|
||||
this.compactionPolicy = new DateTieredCompactionPolicy(conf, store);
|
||||
this.storeFileManager = new DefaultStoreFileManager(kvComparator, conf,
|
||||
compactionPolicy.getConf());
|
||||
this.storeFileManager =
|
||||
new DefaultStoreFileManager(kvComparator, StoreFile.Comparators.SEQ_ID_MAX_TIMESTAMP, conf,
|
||||
compactionPolicy.getConf());
|
||||
this.storeFlusher = new DefaultStoreFlusher(conf, store);
|
||||
this.compactor = new DateTieredCompactor(conf, store);
|
||||
}
|
||||
|
|
|
@ -68,8 +68,9 @@ public class DefaultStoreEngine extends StoreEngine<
|
|||
createCompactor(conf, store);
|
||||
createCompactionPolicy(conf, store);
|
||||
createStoreFlusher(conf, store);
|
||||
storeFileManager = new DefaultStoreFileManager(kvComparator, conf, compactionPolicy.getConf());
|
||||
|
||||
storeFileManager =
|
||||
new DefaultStoreFileManager(kvComparator, StoreFile.Comparators.SEQ_ID, conf,
|
||||
compactionPolicy.getConf());
|
||||
}
|
||||
|
||||
protected void createCompactor(Configuration conf, Store store) throws IOException {
|
||||
|
|
|
@ -22,9 +22,14 @@ import java.io.IOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
import com.google.common.collect.ImmutableCollection;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -34,10 +39,6 @@ import org.apache.hadoop.hbase.KeyValue;
|
|||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
|
||||
|
||||
import com.google.common.collect.ImmutableCollection;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
/**
|
||||
* Default implementation of StoreFileManager. Not thread-safe.
|
||||
*/
|
||||
|
@ -48,7 +49,7 @@ class DefaultStoreFileManager implements StoreFileManager {
|
|||
private final CellComparator kvComparator;
|
||||
private final CompactionConfiguration comConf;
|
||||
private final int blockingFileCount;
|
||||
|
||||
private final Comparator<StoreFile> storeFileComparator;
|
||||
/**
|
||||
* List of store files inside this store. This is an immutable list that
|
||||
* is atomically replaced when its contents change.
|
||||
|
@ -62,9 +63,11 @@ class DefaultStoreFileManager implements StoreFileManager {
|
|||
*/
|
||||
private volatile List<StoreFile> compactedfiles = null;
|
||||
|
||||
public DefaultStoreFileManager(CellComparator kvComparator, Configuration conf,
|
||||
public DefaultStoreFileManager(CellComparator kvComparator,
|
||||
Comparator<StoreFile> storeFileComparator, Configuration conf,
|
||||
CompactionConfiguration comConf) {
|
||||
this.kvComparator = kvComparator;
|
||||
this.storeFileComparator = storeFileComparator;
|
||||
this.comConf = comConf;
|
||||
this.blockingFileCount =
|
||||
conf.getInt(HStore.BLOCKING_STOREFILES_KEY, HStore.DEFAULT_BLOCKING_STOREFILE_COUNT);
|
||||
|
@ -210,13 +213,13 @@ class DefaultStoreFileManager implements StoreFileManager {
|
|||
}
|
||||
|
||||
private void sortAndSetStoreFiles(List<StoreFile> storeFiles) {
|
||||
Collections.sort(storeFiles, StoreFile.Comparators.SEQ_ID);
|
||||
Collections.sort(storeFiles, storeFileComparator);
|
||||
storefiles = ImmutableList.copyOf(storeFiles);
|
||||
}
|
||||
|
||||
private List<StoreFile> sortCompactedfiles(List<StoreFile> storefiles) {
|
||||
// Sorting may not be really needed here for the compacted files?
|
||||
Collections.sort(storefiles, StoreFile.Comparators.SEQ_ID);
|
||||
Collections.sort(storefiles, storeFileComparator);
|
||||
return new ArrayList<StoreFile>(storefiles);
|
||||
}
|
||||
|
||||
|
@ -229,5 +232,10 @@ class DefaultStoreFileManager implements StoreFileManager {
|
|||
}
|
||||
return (double) (storefileCount - minFilesToCompact) / (blockingFileCount - minFilesToCompact);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Comparator<StoreFile> getStoreFileComparator() {
|
||||
return storeFileComparator;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1466,7 +1466,8 @@ public class HStore implements Store {
|
|||
filesToCompact = filesToCompact.subList(count - N, count);
|
||||
isMajor = (filesToCompact.size() == storeEngine.getStoreFileManager().getStorefileCount());
|
||||
filesCompacting.addAll(filesToCompact);
|
||||
Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID);
|
||||
Collections.sort(filesCompacting, storeEngine.getStoreFileManager()
|
||||
.getStoreFileComparator());
|
||||
}
|
||||
} finally {
|
||||
this.lock.readLock().unlock();
|
||||
|
@ -1655,7 +1656,7 @@ public class HStore implements Store {
|
|||
Preconditions.checkArgument(false, "%s overlaps with %s", filesToAdd, filesCompacting);
|
||||
}
|
||||
filesCompacting.addAll(filesToAdd);
|
||||
Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID);
|
||||
Collections.sort(filesCompacting, storeEngine.getStoreFileManager().getStoreFileComparator());
|
||||
}
|
||||
|
||||
private void removeUnneededFiles() throws IOException {
|
||||
|
|
|
@ -20,14 +20,15 @@ package org.apache.hadoop.hbase.regionserver;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.Comparator;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import com.google.common.collect.ImmutableCollection;
|
||||
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
|
||||
import com.google.common.collect.ImmutableCollection;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Manages the store files and basic metadata about that that determines the logical structure
|
||||
|
@ -163,4 +164,10 @@ public interface StoreFileManager {
|
|||
* @see Store#getCompactionPressure()
|
||||
*/
|
||||
double getCompactionPressure();
|
||||
|
||||
/**
|
||||
* @return the comparator used to sort storefiles. Usually, the
|
||||
* {@link StoreFile#getMaxSequenceId()} is the first priority.
|
||||
*/
|
||||
Comparator<StoreFile> getStoreFileComparator();
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.util.ArrayList;
|
|||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
@ -1072,4 +1073,9 @@ public class StripeStoreFileManager
|
|||
}
|
||||
return max;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Comparator<StoreFile> getStoreFileComparator() {
|
||||
return StoreFile.Comparators.SEQ_ID;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -191,10 +191,8 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy {
|
|||
long now = EnvironmentEdgeManager.currentTime();
|
||||
long oldestToCompact = getOldestToCompact(comConf.getMaxStoreFileAgeMillis(), now);
|
||||
|
||||
// Make sure the store files is sorted by SeqId then maxTimestamp
|
||||
List<StoreFile> storeFileList = Lists.newArrayList(filterOldStoreFiles(candidateSelection,
|
||||
oldestToCompact));
|
||||
Collections.sort(storeFileList, StoreFile.Comparators.SEQ_ID_MAX_TIMESTAMP);
|
||||
|
||||
List<Pair<StoreFile, Long>> storefileMaxTimestampPairs =
|
||||
Lists.newArrayListWithCapacity(Iterables.size(storeFileList));
|
||||
|
|
Loading…
Reference in New Issue