HBASE-15665 Support using different StoreFileComparators for different CompactionPolicies
This commit is contained in:
parent
2b5da6f7a0
commit
22b1061044
|
@ -57,8 +57,9 @@ public class DateTieredStoreEngine extends StoreEngine<DefaultStoreFlusher,
|
||||||
protected void createComponents(Configuration conf, Store store, KVComparator kvComparator)
|
protected void createComponents(Configuration conf, Store store, KVComparator kvComparator)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
this.compactionPolicy = new DateTieredCompactionPolicy(conf, store);
|
this.compactionPolicy = new DateTieredCompactionPolicy(conf, store);
|
||||||
this.storeFileManager = new DefaultStoreFileManager(kvComparator, conf,
|
this.storeFileManager =
|
||||||
compactionPolicy.getConf());
|
new DefaultStoreFileManager(kvComparator, StoreFile.Comparators.SEQ_ID_MAX_TIMESTAMP, conf,
|
||||||
|
compactionPolicy.getConf());
|
||||||
this.storeFlusher = new DefaultStoreFlusher(conf, store);
|
this.storeFlusher = new DefaultStoreFlusher(conf, store);
|
||||||
this.compactor = new DateTieredCompactor(conf, store);
|
this.compactor = new DateTieredCompactor(conf, store);
|
||||||
}
|
}
|
||||||
|
|
|
@ -81,7 +81,9 @@ public class DefaultStoreEngine extends StoreEngine<
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new IOException("Unable to load configured compaction policy '" + className + "'", e);
|
throw new IOException("Unable to load configured compaction policy '" + className + "'", e);
|
||||||
}
|
}
|
||||||
storeFileManager = new DefaultStoreFileManager(kvComparator, conf, compactionPolicy.getConf());
|
storeFileManager =
|
||||||
|
new DefaultStoreFileManager(kvComparator, StoreFile.Comparators.SEQ_ID, conf,
|
||||||
|
compactionPolicy.getConf());
|
||||||
className = conf.get(
|
className = conf.get(
|
||||||
DEFAULT_STORE_FLUSHER_CLASS_KEY, DEFAULT_STORE_FLUSHER_CLASS.getName());
|
DEFAULT_STORE_FLUSHER_CLASS_KEY, DEFAULT_STORE_FLUSHER_CLASS.getName());
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -22,9 +22,14 @@ import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.Comparator;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableCollection;
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -34,10 +39,6 @@ import org.apache.hadoop.hbase.KeyValue.KVComparator;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
|
||||||
|
|
||||||
import com.google.common.collect.ImmutableCollection;
|
|
||||||
import com.google.common.collect.ImmutableList;
|
|
||||||
import com.google.common.collect.Lists;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Default implementation of StoreFileManager. Not thread-safe.
|
* Default implementation of StoreFileManager. Not thread-safe.
|
||||||
*/
|
*/
|
||||||
|
@ -48,7 +49,7 @@ class DefaultStoreFileManager implements StoreFileManager {
|
||||||
private final KVComparator kvComparator;
|
private final KVComparator kvComparator;
|
||||||
private final CompactionConfiguration comConf;
|
private final CompactionConfiguration comConf;
|
||||||
private final int blockingFileCount;
|
private final int blockingFileCount;
|
||||||
|
private final Comparator<StoreFile> storeFileComparator;
|
||||||
/**
|
/**
|
||||||
* List of store files inside this store. This is an immutable list that
|
* List of store files inside this store. This is an immutable list that
|
||||||
* is atomically replaced when its contents change.
|
* is atomically replaced when its contents change.
|
||||||
|
@ -62,9 +63,11 @@ class DefaultStoreFileManager implements StoreFileManager {
|
||||||
*/
|
*/
|
||||||
private volatile List<StoreFile> compactedfiles = null;
|
private volatile List<StoreFile> compactedfiles = null;
|
||||||
|
|
||||||
public DefaultStoreFileManager(KVComparator kvComparator, Configuration conf,
|
public DefaultStoreFileManager(KVComparator kvComparator,
|
||||||
|
Comparator<StoreFile> storeFileComparator, Configuration conf,
|
||||||
CompactionConfiguration comConf) {
|
CompactionConfiguration comConf) {
|
||||||
this.kvComparator = kvComparator;
|
this.kvComparator = kvComparator;
|
||||||
|
this.storeFileComparator = storeFileComparator;
|
||||||
this.comConf = comConf;
|
this.comConf = comConf;
|
||||||
this.blockingFileCount =
|
this.blockingFileCount =
|
||||||
conf.getInt(HStore.BLOCKING_STOREFILES_KEY, HStore.DEFAULT_BLOCKING_STOREFILE_COUNT);
|
conf.getInt(HStore.BLOCKING_STOREFILES_KEY, HStore.DEFAULT_BLOCKING_STOREFILE_COUNT);
|
||||||
|
@ -210,13 +213,13 @@ class DefaultStoreFileManager implements StoreFileManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void sortAndSetStoreFiles(List<StoreFile> storeFiles) {
|
private void sortAndSetStoreFiles(List<StoreFile> storeFiles) {
|
||||||
Collections.sort(storeFiles, StoreFile.Comparators.SEQ_ID);
|
Collections.sort(storeFiles, storeFileComparator);
|
||||||
storefiles = ImmutableList.copyOf(storeFiles);
|
storefiles = ImmutableList.copyOf(storeFiles);
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<StoreFile> sortCompactedfiles(List<StoreFile> storefiles) {
|
private List<StoreFile> sortCompactedfiles(List<StoreFile> storefiles) {
|
||||||
// Sorting may not be really needed here for the compacted files?
|
// Sorting may not be really needed here for the compacted files?
|
||||||
Collections.sort(storefiles, StoreFile.Comparators.SEQ_ID);
|
Collections.sort(storefiles, storeFileComparator);
|
||||||
return new ArrayList<StoreFile>(storefiles);
|
return new ArrayList<StoreFile>(storefiles);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -229,5 +232,10 @@ class DefaultStoreFileManager implements StoreFileManager {
|
||||||
}
|
}
|
||||||
return (double) (storefileCount - minFilesToCompact) / (blockingFileCount - minFilesToCompact);
|
return (double) (storefileCount - minFilesToCompact) / (blockingFileCount - minFilesToCompact);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Comparator<StoreFile> getStoreFileComparator() {
|
||||||
|
return storeFileComparator;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1511,7 +1511,8 @@ public class HStore implements Store {
|
||||||
filesToCompact = filesToCompact.subList(count - N, count);
|
filesToCompact = filesToCompact.subList(count - N, count);
|
||||||
isMajor = (filesToCompact.size() == storeEngine.getStoreFileManager().getStorefileCount());
|
isMajor = (filesToCompact.size() == storeEngine.getStoreFileManager().getStorefileCount());
|
||||||
filesCompacting.addAll(filesToCompact);
|
filesCompacting.addAll(filesToCompact);
|
||||||
Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID);
|
Collections.sort(filesCompacting, storeEngine.getStoreFileManager()
|
||||||
|
.getStoreFileComparator());
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
this.lock.readLock().unlock();
|
this.lock.readLock().unlock();
|
||||||
|
@ -1700,7 +1701,7 @@ public class HStore implements Store {
|
||||||
Preconditions.checkArgument(false, "%s overlaps with %s", filesToAdd, filesCompacting);
|
Preconditions.checkArgument(false, "%s overlaps with %s", filesToAdd, filesCompacting);
|
||||||
}
|
}
|
||||||
filesCompacting.addAll(filesToAdd);
|
filesCompacting.addAll(filesToAdd);
|
||||||
Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID);
|
Collections.sort(filesCompacting, storeEngine.getStoreFileManager().getStoreFileComparator());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void removeUnneededFiles() throws IOException {
|
private void removeUnneededFiles() throws IOException {
|
||||||
|
|
|
@ -20,14 +20,15 @@ package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.Comparator;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import com.google.common.collect.ImmutableCollection;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import com.google.common.collect.ImmutableCollection;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Manages the store files and basic metadata about that that determines the logical structure
|
* Manages the store files and basic metadata about that that determines the logical structure
|
||||||
|
@ -163,4 +164,10 @@ public interface StoreFileManager {
|
||||||
* @see Store#getCompactionPressure()
|
* @see Store#getCompactionPressure()
|
||||||
*/
|
*/
|
||||||
double getCompactionPressure();
|
double getCompactionPressure();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the comparator used to sort storefiles. Usually, the
|
||||||
|
* {@link StoreFile#getMaxSequenceId()} is the first priority.
|
||||||
|
*/
|
||||||
|
Comparator<StoreFile> getStoreFileComparator();
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,6 +23,7 @@ import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.Comparator;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -1048,4 +1049,9 @@ public class StripeStoreFileManager
|
||||||
}
|
}
|
||||||
return max;
|
return max;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Comparator<StoreFile> getStoreFileComparator() {
|
||||||
|
return StoreFile.Comparators.SEQ_ID;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -191,10 +191,8 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy {
|
||||||
long now = EnvironmentEdgeManager.currentTime();
|
long now = EnvironmentEdgeManager.currentTime();
|
||||||
long oldestToCompact = getOldestToCompact(comConf.getMaxStoreFileAgeMillis(), now);
|
long oldestToCompact = getOldestToCompact(comConf.getMaxStoreFileAgeMillis(), now);
|
||||||
|
|
||||||
// Make sure the store files is sorted by SeqId then maxTimestamp
|
|
||||||
List<StoreFile> storeFileList = Lists.newArrayList(filterOldStoreFiles(candidateSelection,
|
List<StoreFile> storeFileList = Lists.newArrayList(filterOldStoreFiles(candidateSelection,
|
||||||
oldestToCompact));
|
oldestToCompact));
|
||||||
Collections.sort(storeFileList, StoreFile.Comparators.SEQ_ID_MAX_TIMESTAMP);
|
|
||||||
|
|
||||||
List<Pair<StoreFile, Long>> storefileMaxTimestampPairs =
|
List<Pair<StoreFile, Long>> storefileMaxTimestampPairs =
|
||||||
Lists.newArrayListWithCapacity(Iterables.size(storeFileList));
|
Lists.newArrayListWithCapacity(Iterables.size(storeFileList));
|
||||||
|
|
Loading…
Reference in New Issue