HBASE-7843 Enable encapsulating compaction policy/compactor/store file manager interaction shenanigans
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1450407 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
05711f7553
commit
2eeb5761b3
|
@ -41,7 +41,6 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner;
|
|||
import org.apache.hadoop.hbase.regionserver.ScanType;
|
||||
import org.apache.hadoop.hbase.regionserver.Store;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||
|
|
|
@ -26,6 +26,7 @@ import java.util.concurrent.BlockingQueue;
|
|||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.PriorityBlockingQueue;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.RejectedExecutionHandler;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -34,7 +35,12 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
|
@ -89,8 +95,7 @@ public class CompactSplitThread implements CompactionRequestor {
|
|||
return t;
|
||||
}
|
||||
});
|
||||
this.largeCompactions
|
||||
.setRejectedExecutionHandler(new CompactionRequest.Rejection());
|
||||
this.largeCompactions.setRejectedExecutionHandler(new Rejection());
|
||||
this.smallCompactions = new ThreadPoolExecutor(smallThreads, smallThreads,
|
||||
60, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>(),
|
||||
new ThreadFactory() {
|
||||
|
@ -102,7 +107,7 @@ public class CompactSplitThread implements CompactionRequestor {
|
|||
}
|
||||
});
|
||||
this.smallCompactions
|
||||
.setRejectedExecutionHandler(new CompactionRequest.Rejection());
|
||||
.setRejectedExecutionHandler(new Rejection());
|
||||
this.splits = (ThreadPoolExecutor)
|
||||
Executors.newFixedThreadPool(splitThreads,
|
||||
new ThreadFactory() {
|
||||
|
@ -193,7 +198,7 @@ public class CompactSplitThread implements CompactionRequestor {
|
|||
|
||||
@Override
|
||||
public synchronized List<CompactionRequest> requestCompaction(final HRegion r, final String why,
|
||||
List<CompactionRequest> requests) throws IOException {
|
||||
List<Pair<CompactionRequest, Store>> requests) throws IOException {
|
||||
return requestCompaction(r, why, Store.NO_PRIORITY, requests);
|
||||
}
|
||||
|
||||
|
@ -205,7 +210,7 @@ public class CompactSplitThread implements CompactionRequestor {
|
|||
|
||||
@Override
|
||||
public synchronized List<CompactionRequest> requestCompaction(final HRegion r, final String why,
|
||||
int p, List<CompactionRequest> requests) throws IOException {
|
||||
int p, List<Pair<CompactionRequest, Store>> requests) throws IOException {
|
||||
// not a special compaction request, so make our own list
|
||||
List<CompactionRequest> ret;
|
||||
if (requests == null) {
|
||||
|
@ -215,8 +220,8 @@ public class CompactSplitThread implements CompactionRequestor {
|
|||
}
|
||||
} else {
|
||||
ret = new ArrayList<CompactionRequest>(requests.size());
|
||||
for (CompactionRequest request : requests) {
|
||||
requests.add(requestCompaction(r, request.getStore(), why, p, request));
|
||||
for (Pair<CompactionRequest, Store> pair : requests) {
|
||||
ret.add(requestCompaction(r, pair.getSecond(), why, p, pair.getFirst()));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
|
@ -235,28 +240,29 @@ public class CompactSplitThread implements CompactionRequestor {
|
|||
if (this.server.isStopped()) {
|
||||
return null;
|
||||
}
|
||||
CompactionRequest cr = s.requestCompaction(priority, request);
|
||||
if (cr != null) {
|
||||
cr.setServer(server);
|
||||
if (priority != Store.NO_PRIORITY) {
|
||||
cr.setPriority(priority);
|
||||
}
|
||||
ThreadPoolExecutor pool = s.throttleCompaction(cr.getSize())
|
||||
? largeCompactions : smallCompactions;
|
||||
pool.execute(cr);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
String type = (pool == smallCompactions) ? "Small " : "Large ";
|
||||
LOG.debug(type + "Compaction requested: " + cr
|
||||
+ (why != null && !why.isEmpty() ? "; Because: " + why : "")
|
||||
+ "; " + this);
|
||||
}
|
||||
} else {
|
||||
CompactionContext compaction = s.requestCompaction(priority, request);
|
||||
if (compaction == null) {
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("Not compacting " + r.getRegionNameAsString() +
|
||||
" because compaction request was cancelled");
|
||||
}
|
||||
return null;
|
||||
}
|
||||
return cr;
|
||||
|
||||
assert compaction.hasSelection();
|
||||
if (priority != Store.NO_PRIORITY) {
|
||||
compaction.getRequest().setPriority(priority);
|
||||
}
|
||||
ThreadPoolExecutor pool = s.throttleCompaction(compaction.getRequest().getSize())
|
||||
? largeCompactions : smallCompactions;
|
||||
pool.execute(new CompactionRunner(s, r, compaction));
|
||||
if (LOG.isDebugEnabled()) {
|
||||
String type = (pool == smallCompactions) ? "Small " : "Large ";
|
||||
LOG.debug(type + "Compaction requested: " + compaction
|
||||
+ (why != null && !why.isEmpty() ? "; Because: " + why : "")
|
||||
+ "; " + this);
|
||||
}
|
||||
return compaction.getRequest();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -309,4 +315,73 @@ public class CompactSplitThread implements CompactionRequestor {
|
|||
public int getRegionSplitLimit() {
|
||||
return this.regionSplitLimit;
|
||||
}
|
||||
|
||||
private class CompactionRunner implements Runnable, Comparable<CompactionRunner> {
|
||||
private final Store store;
|
||||
private final HRegion region;
|
||||
private final CompactionContext compaction;
|
||||
|
||||
public CompactionRunner(Store store, HRegion region, CompactionContext compaction) {
|
||||
super();
|
||||
this.store = store;
|
||||
this.region = region;
|
||||
this.compaction = compaction;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
Preconditions.checkNotNull(server);
|
||||
if (server.isStopped()) {
|
||||
return;
|
||||
}
|
||||
this.compaction.getRequest().beforeExecute();
|
||||
try {
|
||||
// Note: please don't put single-compaction logic here;
|
||||
// put it into region/store/etc. This is CST logic.
|
||||
long start = EnvironmentEdgeManager.currentTimeMillis();
|
||||
boolean completed = region.compact(compaction, store);
|
||||
long now = EnvironmentEdgeManager.currentTimeMillis();
|
||||
LOG.info(((completed) ? "Completed" : "Aborted") + " compaction: " +
|
||||
this + "; duration=" + StringUtils.formatTimeDiff(now, start));
|
||||
if (completed) {
|
||||
// degenerate case: blocked regions require recursive enqueues
|
||||
if (store.getCompactPriority() <= 0) {
|
||||
requestCompaction(region, store, "Recursive enqueue", null);
|
||||
} else {
|
||||
// see if the compaction has caused us to exceed max region size
|
||||
requestSplit(region);
|
||||
}
|
||||
}
|
||||
} catch (IOException ex) {
|
||||
LOG.error("Compaction failed " + this, RemoteExceptionHandler.checkIOException(ex));
|
||||
server.checkFileSystem();
|
||||
} catch (Exception ex) {
|
||||
LOG.error("Compaction failed " + this, ex);
|
||||
server.checkFileSystem();
|
||||
} finally {
|
||||
LOG.debug("CompactSplitThread Status: " + CompactSplitThread.this);
|
||||
}
|
||||
this.compaction.getRequest().afterExecute();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(CompactionRunner o) {
|
||||
// Only compare the underlying request, for queue sorting purposes.
|
||||
return this.compaction.getRequest().compareTo(o.compaction.getRequest());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Cleanup class to use when rejecting a compaction request from the queue.
|
||||
*/
|
||||
private static class Rejection implements RejectedExecutionHandler {
|
||||
@Override
|
||||
public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) {
|
||||
if (runnable instanceof CompactionRunner) {
|
||||
CompactionRunner runner = (CompactionRunner)runnable;
|
||||
LOG.debug("Compaction Rejected: " + runner);
|
||||
runner.store.cancelRequestedCompaction(runner.compaction);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.util.List;
|
|||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public interface CompactionRequestor {
|
||||
|
@ -47,7 +48,7 @@ public interface CompactionRequestor {
|
|||
* @throws IOException
|
||||
*/
|
||||
public List<CompactionRequest> requestCompaction(final HRegion r, final String why,
|
||||
List<CompactionRequest> requests)
|
||||
List<Pair<CompactionRequest, Store>> requests)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
|
@ -74,7 +75,7 @@ public interface CompactionRequestor {
|
|||
* @throws IOException
|
||||
*/
|
||||
public List<CompactionRequest> requestCompaction(final HRegion r, final String why, int pri,
|
||||
List<CompactionRequest> requests) throws IOException;
|
||||
List<Pair<CompactionRequest, Store>> requests) throws IOException;
|
||||
|
||||
/**
|
||||
* @param r Region to compact
|
||||
|
|
|
@ -56,7 +56,7 @@ import org.apache.hadoop.hbase.HDFSBlocksDistribution;
|
|||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
||||
import org.apache.hadoop.hbase.mapreduce.JobUtil;
|
||||
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -156,8 +156,9 @@ public class CompactionTool extends Configured implements Tool {
|
|||
" family=" + familyDir.getName());
|
||||
HStore store = getStore(region, familyDir);
|
||||
do {
|
||||
CompactionRequest cr = store.requestCompaction();
|
||||
List<StoreFile> storeFiles = store.compact(cr);
|
||||
CompactionContext compaction = store.requestCompaction();
|
||||
if (compaction == null) break;
|
||||
List<StoreFile> storeFiles = store.compact(compaction);
|
||||
if (storeFiles != null && !storeFiles.isEmpty()) {
|
||||
if (keepCompactedFiles && deleteCompacted) {
|
||||
for (StoreFile storeFile: storeFiles) {
|
||||
|
@ -465,4 +466,4 @@ public class CompactionTool extends Configured implements Tool {
|
|||
public static void main(String[] args) throws Exception {
|
||||
System.exit(ToolRunner.run(HBaseConfiguration.create(), new CompactionTool(), args));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,28 +18,64 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.KeyValue.KVComparator;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactionPolicy;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
|
||||
|
||||
/**
|
||||
* Default StoreEngine creates the default compactor, policy, and store file manager.
|
||||
* Default StoreEngine creates the default compactor, policy, and store file manager, or
|
||||
* their derivatives.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class DefaultStoreEngine extends StoreEngine {
|
||||
public class DefaultStoreEngine extends StoreEngine<
|
||||
DefaultCompactionPolicy, DefaultCompactor, DefaultStoreFileManager> {
|
||||
|
||||
public DefaultStoreEngine(Configuration conf, Store store, KVComparator comparator) {
|
||||
super(conf, store, comparator);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void createComponents(PP<StoreFileManager> storeFileManager,
|
||||
PP<CompactionPolicy> compactionPolicy, PP<Compactor> compactor) {
|
||||
storeFileManager.set(new DefaultStoreFileManager(this.comparator));
|
||||
compactionPolicy.set(new DefaultCompactionPolicy(this.conf, this.store));
|
||||
compactor.set(new DefaultCompactor(this.conf, this.store));
|
||||
protected void createComponents() {
|
||||
storeFileManager = new DefaultStoreFileManager(this.comparator);
|
||||
|
||||
// TODO: compactor and policy may be separately pluggable, but must derive from default ones.
|
||||
compactor = new DefaultCompactor(this.conf, this.store);
|
||||
compactionPolicy = new DefaultCompactionPolicy(this.conf, this.store/*as StoreConfigInfo*/);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected CompactionContext createCompactionContext() {
|
||||
return new DefaultCompactionContext();
|
||||
}
|
||||
|
||||
private class DefaultCompactionContext extends CompactionContext {
|
||||
@Override
|
||||
public boolean select(List<StoreFile> filesCompacting, boolean isUserCompaction,
|
||||
boolean mayUseOffPeak, boolean forceMajor) throws IOException {
|
||||
request = compactionPolicy.selectCompaction(storeFileManager.getStorefiles(),
|
||||
filesCompacting, isUserCompaction, mayUseOffPeak, forceMajor);
|
||||
return request != null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Path> compact() throws IOException {
|
||||
return compactor.compact(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<StoreFile> preSelect(List<StoreFile> filesCompacting) {
|
||||
return compactionPolicy.preSelectCompactionForCoprocessor(
|
||||
storeFileManager.getStorefiles(), filesCompacting);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -119,6 +119,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRespo
|
|||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
|
||||
import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
|
||||
|
@ -1291,13 +1292,9 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
*/
|
||||
public void compactStores() throws IOException {
|
||||
for (Store s : getStores().values()) {
|
||||
CompactionRequest cr = s.requestCompaction();
|
||||
if(cr != null) {
|
||||
try {
|
||||
compact(cr);
|
||||
} finally {
|
||||
s.finishRequest(cr);
|
||||
}
|
||||
CompactionContext compaction = s.requestCompaction();
|
||||
if (compaction != null) {
|
||||
compact(compaction, s);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1317,45 +1314,46 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
* @return whether the compaction completed
|
||||
* @throws IOException e
|
||||
*/
|
||||
public boolean compact(CompactionRequest cr)
|
||||
throws IOException {
|
||||
if (cr == null) {
|
||||
return false;
|
||||
}
|
||||
public boolean compact(CompactionContext compaction, Store store) throws IOException {
|
||||
assert compaction != null && compaction.hasSelection();
|
||||
assert !compaction.getRequest().getFiles().isEmpty();
|
||||
if (this.closing.get() || this.closed.get()) {
|
||||
LOG.debug("Skipping compaction on " + this + " because closing/closed");
|
||||
store.cancelRequestedCompaction(compaction);
|
||||
return false;
|
||||
}
|
||||
Preconditions.checkArgument(cr.getHRegion().equals(this));
|
||||
MonitoredTask status = null;
|
||||
boolean didPerformCompaction = false;
|
||||
// block waiting for the lock for compaction
|
||||
lock.readLock().lock();
|
||||
try {
|
||||
status = TaskMonitor.get().createStatus(
|
||||
"Compacting " + cr.getStore() + " in " + this);
|
||||
status = TaskMonitor.get().createStatus("Compacting " + store + " in " + this);
|
||||
if (this.closed.get()) {
|
||||
LOG.debug("Skipping compaction on " + this + " because closed");
|
||||
String msg = "Skipping compaction on " + this + " because closed";
|
||||
LOG.debug(msg);
|
||||
status.abort(msg);
|
||||
return false;
|
||||
}
|
||||
boolean decr = true;
|
||||
boolean wasStateSet = false;
|
||||
try {
|
||||
synchronized (writestate) {
|
||||
if (writestate.writesEnabled) {
|
||||
wasStateSet = true;
|
||||
++writestate.compacting;
|
||||
} else {
|
||||
String msg = "NOT compacting region " + this + ". Writes disabled.";
|
||||
LOG.info(msg);
|
||||
status.abort(msg);
|
||||
decr = false;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
LOG.info("Starting compaction on " + cr.getStore() + " in region "
|
||||
+ this + (cr.getCompactSelection().isOffPeakCompaction()?" as an off-peak compaction":""));
|
||||
LOG.info("Starting compaction on " + store + " in region " + this
|
||||
+ (compaction.getRequest().isOffPeak()?" as an off-peak compaction":""));
|
||||
doRegionCompactionPrep();
|
||||
try {
|
||||
status.setStatus("Compacting store " + cr.getStore());
|
||||
cr.getStore().compact(cr);
|
||||
status.setStatus("Compacting store " + store);
|
||||
didPerformCompaction = true;
|
||||
store.compact(compaction);
|
||||
} catch (InterruptedIOException iioe) {
|
||||
String msg = "compaction interrupted";
|
||||
LOG.info(msg, iioe);
|
||||
|
@ -1363,7 +1361,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
return false;
|
||||
}
|
||||
} finally {
|
||||
if (decr) {
|
||||
if (wasStateSet) {
|
||||
synchronized (writestate) {
|
||||
--writestate.compacting;
|
||||
if (writestate.compacting <= 0) {
|
||||
|
@ -1376,6 +1374,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
return true;
|
||||
} finally {
|
||||
try {
|
||||
if (!didPerformCompaction) store.cancelRequestedCompaction(compaction);
|
||||
if (status != null) status.cleanup();
|
||||
} finally {
|
||||
lock.readLock().unlock();
|
||||
|
|
|
@ -66,7 +66,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileScanner;
|
|||
import org.apache.hadoop.hbase.exceptions.InvalidHFileException;
|
||||
import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
|
||||
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||
|
@ -154,8 +154,8 @@ public class HStore implements Store {
|
|||
// Comparing KeyValues
|
||||
private final KeyValue.KVComparator comparator;
|
||||
|
||||
final Compactor compactor;
|
||||
|
||||
final StoreEngine<?, ?, ?> storeEngine;
|
||||
|
||||
private OffPeakCompactions offPeakCompactions;
|
||||
|
||||
private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10;
|
||||
|
@ -223,8 +223,11 @@ public class HStore implements Store {
|
|||
"hbase.hstore.close.check.interval", 10*1000*1000 /* 10 MB */);
|
||||
}
|
||||
|
||||
StoreEngine engine = StoreEngine.create(this, this.conf, this.comparator);
|
||||
this.storeFileManager = engine.getStoreFileManager();
|
||||
storeEngine = StoreEngine.create(this, this.conf, this.comparator);
|
||||
// Copy some things to local fields for convenience.
|
||||
this.storeFileManager = storeEngine.getStoreFileManager();
|
||||
this.compactionPolicy = storeEngine.getCompactionPolicy();
|
||||
|
||||
this.storeFileManager.loadFiles(loadStoreFiles());
|
||||
|
||||
// Initialize checksum type from name. The names are CRC32, CRC32C, etc.
|
||||
|
@ -243,9 +246,6 @@ public class HStore implements Store {
|
|||
+ HStore.flush_retries_number);
|
||||
}
|
||||
}
|
||||
this.compactionPolicy = engine.getCompactionPolicy();
|
||||
// Get the compaction tool instance for this policy
|
||||
this.compactor = engine.getCompactor();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1067,15 +1067,15 @@ public class HStore implements Store {
|
|||
* <p>We don't want to hold the structureLock for the whole time, as a compact()
|
||||
* can be lengthy and we want to allow cache-flushes during this period.
|
||||
*
|
||||
* @param cr
|
||||
* compaction details obtained from requestCompaction()
|
||||
* @param compaction compaction details obtained from requestCompaction()
|
||||
* @throws IOException
|
||||
* @return Storefile we compacted into or null if we failed or opted out early.
|
||||
*/
|
||||
List<StoreFile> compact(CompactionRequest cr) throws IOException {
|
||||
if (cr == null || cr.getFiles().isEmpty()) return null;
|
||||
Preconditions.checkArgument(cr.getStore().toString().equals(this.toString()));
|
||||
List<StoreFile> filesToCompact = cr.getFiles();
|
||||
public List<StoreFile> compact(CompactionContext compaction) throws IOException {
|
||||
assert compaction != null && compaction.hasSelection();
|
||||
CompactionRequest cr = compaction.getRequest();
|
||||
Collection<StoreFile> filesToCompact = cr.getFiles();
|
||||
assert !filesToCompact.isEmpty();
|
||||
synchronized (filesCompacting) {
|
||||
// sanity check: we're compacting files that this store knows about
|
||||
// TODO: change this to LOG.error() after more debugging
|
||||
|
@ -1091,16 +1091,20 @@ public class HStore implements Store {
|
|||
List<StoreFile> sfs = new ArrayList<StoreFile>();
|
||||
long compactionStartTime = EnvironmentEdgeManager.currentTimeMillis();
|
||||
try {
|
||||
List<Path> newFiles = this.compactor.compact(cr);
|
||||
// Commence the compaction.
|
||||
List<Path> newFiles = compaction.compact();
|
||||
// Move the compaction into place.
|
||||
if (this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
|
||||
for (Path newFile: newFiles) {
|
||||
StoreFile sf = completeCompaction(filesToCompact, newFile);
|
||||
assert newFile != null;
|
||||
StoreFile sf = moveFileIntoPlace(newFile);
|
||||
if (region.getCoprocessorHost() != null) {
|
||||
region.getCoprocessorHost().postCompact(this, sf, cr);
|
||||
}
|
||||
assert sf != null;
|
||||
sfs.add(sf);
|
||||
}
|
||||
completeCompaction(filesToCompact, sfs);
|
||||
} else {
|
||||
for (Path newFile: newFiles) {
|
||||
// Create storefile around what we wrote with a reader on it.
|
||||
|
@ -1111,15 +1115,24 @@ public class HStore implements Store {
|
|||
}
|
||||
}
|
||||
} finally {
|
||||
synchronized (filesCompacting) {
|
||||
filesCompacting.removeAll(filesToCompact);
|
||||
}
|
||||
finishCompactionRequest(cr);
|
||||
}
|
||||
logCompactionEndMessage(cr, sfs, compactionStartTime);
|
||||
return sfs;
|
||||
}
|
||||
|
||||
/**
|
||||
* Log a very elaborate compaction completion message.
|
||||
* @param cr Request.
|
||||
* @param sfs Resulting files.
|
||||
* @param compactionStartTime Start time.
|
||||
*/
|
||||
private void logCompactionEndMessage(
|
||||
CompactionRequest cr, List<StoreFile> sfs, long compactionStartTime) {
|
||||
long now = EnvironmentEdgeManager.currentTimeMillis();
|
||||
StringBuilder message = new StringBuilder(
|
||||
"Completed" + (cr.isMajor() ? " major " : " ") + "compaction of "
|
||||
+ filesToCompact.size() + " file(s) in " + this + " of "
|
||||
+ cr.getFiles().size() + " file(s) in " + this + " of "
|
||||
+ this.region.getRegionInfo().getRegionNameAsString()
|
||||
+ " into ");
|
||||
if (sfs.isEmpty()) {
|
||||
|
@ -1139,7 +1152,23 @@ public class HStore implements Store {
|
|||
.append(", and took ").append(StringUtils.formatTimeDiff(now, compactionStartTime))
|
||||
.append(" to execute.");
|
||||
LOG.info(message.toString());
|
||||
return sfs;
|
||||
}
|
||||
|
||||
// Package-visible for tests
|
||||
StoreFile moveFileIntoPlace(Path newFile) throws IOException {
|
||||
validateStoreFile(newFile);
|
||||
// Move the file into the right spot
|
||||
Path destPath = new Path(homedir, newFile.getName());
|
||||
LOG.info("Renaming compacted file at " + newFile + " to " + destPath);
|
||||
if (!fs.rename(newFile, destPath)) {
|
||||
String err = "Failed move of compacted file " + newFile + " to " + destPath;
|
||||
LOG.error(err);
|
||||
throw new IOException(err);
|
||||
}
|
||||
StoreFile result = new StoreFile(this.fs, destPath, this.conf, this.cacheConf,
|
||||
this.family.getBloomFilterType(), this.dataBlockEncoder);
|
||||
result.createReader();
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1181,13 +1210,17 @@ public class HStore implements Store {
|
|||
|
||||
try {
|
||||
// Ready to go. Have list of files to compact.
|
||||
List<Path> newFiles = this.compactor.compactForTesting(filesToCompact, isMajor);
|
||||
List<Path> newFiles =
|
||||
this.storeEngine.getCompactor().compactForTesting(filesToCompact, isMajor);
|
||||
for (Path newFile: newFiles) {
|
||||
// Move the compaction into place.
|
||||
StoreFile sf = completeCompaction(filesToCompact, newFile);
|
||||
StoreFile sf = moveFileIntoPlace(newFile);
|
||||
if (region.getCoprocessorHost() != null) {
|
||||
region.getCoprocessorHost().postCompact(this, sf, null);
|
||||
}
|
||||
ArrayList<StoreFile> tmp = new ArrayList<StoreFile>();
|
||||
tmp.add(sf);
|
||||
completeCompaction(filesToCompact, tmp);
|
||||
}
|
||||
} finally {
|
||||
synchronized (filesCompacting) {
|
||||
|
@ -1203,7 +1236,7 @@ public class HStore implements Store {
|
|||
|
||||
@Override
|
||||
public CompactionProgress getCompactionProgress() {
|
||||
return this.compactor.getProgress();
|
||||
return this.storeEngine.getCompactor().getProgress();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1219,100 +1252,102 @@ public class HStore implements Store {
|
|||
}
|
||||
|
||||
@Override
|
||||
public CompactionRequest requestCompaction() throws IOException {
|
||||
public CompactionContext requestCompaction() throws IOException {
|
||||
return requestCompaction(Store.NO_PRIORITY, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompactionRequest requestCompaction(int priority, CompactionRequest request)
|
||||
public CompactionContext requestCompaction(int priority, CompactionRequest baseRequest)
|
||||
throws IOException {
|
||||
// don't even select for compaction if writes are disabled
|
||||
if (!this.region.areWritesEnabled()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
CompactionContext compaction = storeEngine.createCompaction();
|
||||
this.lock.readLock().lock();
|
||||
try {
|
||||
List<StoreFile> candidates = Lists.newArrayList(storeFileManager.getStorefiles());
|
||||
synchronized (filesCompacting) {
|
||||
// First we need to pre-select compaction, and then pre-compact selection!
|
||||
candidates = compactionPolicy.preSelectCompaction(candidates, filesCompacting);
|
||||
boolean override = false;
|
||||
// First, see if coprocessor would want to override selection.
|
||||
if (region.getCoprocessorHost() != null) {
|
||||
override = region.getCoprocessorHost().preCompactSelection(this, candidates, request);
|
||||
}
|
||||
CompactSelection filesToCompact;
|
||||
if (override) {
|
||||
// coprocessor is overriding normal file selection
|
||||
filesToCompact = new CompactSelection(candidates);
|
||||
} else {
|
||||
boolean isUserCompaction = priority == Store.PRIORITY_USER;
|
||||
boolean mayUseOffPeak = this.offPeakCompactions.tryStartOffPeakRequest();
|
||||
filesToCompact = compactionPolicy.selectCompaction(candidates, isUserCompaction,
|
||||
mayUseOffPeak, forceMajor && filesCompacting.isEmpty());
|
||||
if (mayUseOffPeak && !filesToCompact.isOffPeakCompaction()) {
|
||||
// Compaction policy doesn't want to do anything with off-peak.
|
||||
this.offPeakCompactions.endOffPeakRequest();
|
||||
List<StoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting);
|
||||
boolean override = region.getCoprocessorHost().preCompactSelection(
|
||||
this, candidatesForCoproc, baseRequest);
|
||||
if (override) {
|
||||
// Coprocessor is overriding normal file selection.
|
||||
compaction.forceSelect(new CompactionRequest(candidatesForCoproc));
|
||||
}
|
||||
}
|
||||
|
||||
// Normal case - coprocessor is not overriding file selection.
|
||||
if (!compaction.hasSelection()) {
|
||||
boolean isUserCompaction = priority == Store.PRIORITY_USER;
|
||||
boolean mayUseOffPeak = this.offPeakCompactions.tryStartOffPeakRequest();
|
||||
compaction.select(this.filesCompacting, isUserCompaction,
|
||||
mayUseOffPeak, forceMajor && filesCompacting.isEmpty());
|
||||
assert compaction.hasSelection();
|
||||
if (mayUseOffPeak && !compaction.getRequest().isOffPeak()) {
|
||||
// Compaction policy doesn't want to take advantage of off-peak.
|
||||
this.offPeakCompactions.endOffPeakRequest();
|
||||
}
|
||||
}
|
||||
if (region.getCoprocessorHost() != null) {
|
||||
region.getCoprocessorHost().postCompactSelection(this,
|
||||
ImmutableList.copyOf(filesToCompact.getFilesToCompact()), request);
|
||||
region.getCoprocessorHost().postCompactSelection(
|
||||
this, ImmutableList.copyOf(compaction.getRequest().getFiles()), baseRequest);
|
||||
}
|
||||
|
||||
// no files to compact
|
||||
if (filesToCompact.getFilesToCompact().isEmpty()) {
|
||||
// Selected files; see if we have a compaction with some custom base request.
|
||||
if (baseRequest != null) {
|
||||
// Update the request with what the system thinks the request should be;
|
||||
// its up to the request if it wants to listen.
|
||||
compaction.forceSelect(
|
||||
baseRequest.combineWith(compaction.getRequest()));
|
||||
}
|
||||
|
||||
// Finally, we have the resulting files list. Check if we have any files at all.
|
||||
final Collection<StoreFile> selectedFiles = compaction.getRequest().getFiles();
|
||||
if (selectedFiles.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// basic sanity check: do not try to compact the same StoreFile twice.
|
||||
if (!Collections.disjoint(filesCompacting, filesToCompact.getFilesToCompact())) {
|
||||
// Update filesCompacting (check that we do not try to compact the same StoreFile twice).
|
||||
if (!Collections.disjoint(filesCompacting, selectedFiles)) {
|
||||
// TODO: change this from an IAE to LOG.error after sufficient testing
|
||||
Preconditions.checkArgument(false, "%s overlaps with %s",
|
||||
filesToCompact, filesCompacting);
|
||||
selectedFiles, filesCompacting);
|
||||
}
|
||||
filesCompacting.addAll(filesToCompact.getFilesToCompact());
|
||||
filesCompacting.addAll(selectedFiles);
|
||||
Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID);
|
||||
|
||||
boolean isMajor =
|
||||
(filesToCompact.getFilesToCompact().size() == this.getStorefilesCount());
|
||||
if (isMajor) {
|
||||
// since we're enqueuing a major, update the compaction wait interval
|
||||
this.forceMajor = false;
|
||||
}
|
||||
// If we're enqueuing a major, clear the force flag.
|
||||
boolean isMajor = selectedFiles.size() == this.getStorefilesCount();
|
||||
this.forceMajor = this.forceMajor && !isMajor;
|
||||
|
||||
LOG.debug(getRegionInfo().getEncodedName() + " - " +
|
||||
getColumnFamilyName() + ": Initiating " +
|
||||
(isMajor ? "major" : "minor") + " compaction");
|
||||
|
||||
// everything went better than expected. create a compaction request
|
||||
int pri = getCompactPriority(priority);
|
||||
//not a special compaction request, so we need to make one
|
||||
if(request == null){
|
||||
request = new CompactionRequest(region, this, filesToCompact, isMajor, pri);
|
||||
}else{
|
||||
//update the request with what the system thinks the request should be
|
||||
//its up to the request if it wants to listen
|
||||
request.setSelection(filesToCompact);
|
||||
request.setIsMajor(isMajor);
|
||||
request.setPriority(pri);
|
||||
}
|
||||
// Set common request properties.
|
||||
compaction.getRequest().setPriority(getCompactPriority(priority));
|
||||
compaction.getRequest().setIsMajor(isMajor);
|
||||
compaction.getRequest().setDescription(
|
||||
region.getRegionNameAsString(), getColumnFamilyName());
|
||||
}
|
||||
} finally {
|
||||
this.lock.readLock().unlock();
|
||||
}
|
||||
if (request != null) {
|
||||
this.region.reportCompactionRequestStart(request.isMajor());
|
||||
}
|
||||
return request;
|
||||
|
||||
LOG.debug(getRegionInfo().getEncodedName() + " - " + getColumnFamilyName() + ": Initiating "
|
||||
+ (compaction.getRequest().isMajor() ? "major" : "minor") + " compaction");
|
||||
this.region.reportCompactionRequestStart(compaction.getRequest().isMajor());
|
||||
return compaction;
|
||||
}
|
||||
|
||||
public void finishRequest(CompactionRequest cr) {
|
||||
public void cancelRequestedCompaction(CompactionContext compaction) {
|
||||
finishCompactionRequest(compaction.getRequest());
|
||||
}
|
||||
|
||||
private void finishCompactionRequest(CompactionRequest cr) {
|
||||
this.region.reportCompactionRequestEnd(cr.isMajor());
|
||||
if (cr.getCompactSelection().isOffPeakCompaction()) {
|
||||
if (cr.isOffPeak()) {
|
||||
this.offPeakCompactions.endOffPeakRequest();
|
||||
cr.getCompactSelection().setOffPeak(false);
|
||||
cr.setOffPeak(false);
|
||||
}
|
||||
synchronized (filesCompacting) {
|
||||
filesCompacting.removeAll(cr.getFiles());
|
||||
|
@ -1363,28 +1398,8 @@ public class HStore implements Store {
|
|||
* @return StoreFile created. May be null.
|
||||
* @throws IOException
|
||||
*/
|
||||
StoreFile completeCompaction(final Collection<StoreFile> compactedFiles,
|
||||
final Path newFile)
|
||||
throws IOException {
|
||||
// 1. Moving the new files into place -- if there is a new file (may not
|
||||
// be if all cells were expired or deleted).
|
||||
StoreFile result = null;
|
||||
if (newFile != null) {
|
||||
validateStoreFile(newFile);
|
||||
// Move the file into the right spot
|
||||
Path destPath = new Path(homedir, newFile.getName());
|
||||
LOG.info("Renaming compacted file at " + newFile + " to " + destPath);
|
||||
if (!fs.rename(newFile, destPath)) {
|
||||
LOG.error("Failed move of compacted file " + newFile + " to " +
|
||||
destPath);
|
||||
throw new IOException("Failed move of compacted file " + newFile +
|
||||
" to " + destPath);
|
||||
}
|
||||
result = new StoreFile(this.fs, destPath, this.conf, this.cacheConf,
|
||||
this.family.getBloomFilterType(), this.dataBlockEncoder);
|
||||
result.createReader();
|
||||
}
|
||||
|
||||
private void completeCompaction(final Collection<StoreFile> compactedFiles,
|
||||
final Collection<StoreFile> result) throws IOException {
|
||||
try {
|
||||
this.lock.writeLock().lock();
|
||||
try {
|
||||
|
@ -1392,11 +1407,7 @@ public class HStore implements Store {
|
|||
// delete old store files until we have sent out notification of
|
||||
// change in case old files are still being accessed by outstanding
|
||||
// scanners.
|
||||
List<StoreFile> results = new ArrayList<StoreFile>(1);
|
||||
if (result != null) {
|
||||
results.add(result);
|
||||
}
|
||||
this.storeFileManager.addCompactionResults(compactedFiles, results);
|
||||
this.storeFileManager.addCompactionResults(compactedFiles, result);
|
||||
filesCompacting.removeAll(compactedFiles); // safe bc: lock.writeLock()
|
||||
} finally {
|
||||
// We need the lock, as long as we are updating the storeFiles
|
||||
|
@ -1418,8 +1429,8 @@ public class HStore implements Store {
|
|||
} catch (IOException e) {
|
||||
e = RemoteExceptionHandler.checkIOException(e);
|
||||
LOG.error("Failed replacing compacted files in " + this +
|
||||
". Compacted file is " + (result == null? "none": result.toString()) +
|
||||
". Files replaced " + compactedFiles.toString() +
|
||||
". Compacted files are " + (result == null? "none": result.toString()) +
|
||||
". Files replaced " + compactedFiles.toString() +
|
||||
" some of which may have been already removed", e);
|
||||
}
|
||||
|
||||
|
@ -1435,7 +1446,6 @@ public class HStore implements Store {
|
|||
this.storeSize += r.length();
|
||||
this.totalUncompressedBytes += r.getTotalUncompressedBytes();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
|
@ -55,7 +55,6 @@ import org.apache.hadoop.hbase.coprocessor.RegionObserver;
|
|||
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
|
||||
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.io.HeapSize;
|
|||
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||
|
||||
|
@ -157,12 +158,14 @@ public interface Store extends HeapSize, StoreConfigInformation {
|
|||
*/
|
||||
public CompactionProgress getCompactionProgress();
|
||||
|
||||
public CompactionRequest requestCompaction() throws IOException;
|
||||
public CompactionContext requestCompaction() throws IOException;
|
||||
|
||||
public CompactionRequest requestCompaction(int priority, CompactionRequest request)
|
||||
public CompactionContext requestCompaction(int priority, CompactionRequest baseRequest)
|
||||
throws IOException;
|
||||
|
||||
public void finishRequest(CompactionRequest cr);
|
||||
public void cancelRequestedCompaction(CompactionContext compaction);
|
||||
|
||||
public List<StoreFile> compact(CompactionContext compaction) throws IOException;
|
||||
|
||||
/**
|
||||
* @return true if we should run a major compaction.
|
||||
|
|
|
@ -24,8 +24,11 @@ import java.io.IOException;
|
|||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.KeyValue.KVComparator;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactionPolicy;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
|
||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||
|
||||
/**
|
||||
|
@ -34,14 +37,15 @@ import org.apache.hadoop.hbase.util.ReflectionUtils;
|
|||
* they are tied together and replaced together via StoreEngine-s.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public abstract class StoreEngine {
|
||||
public abstract class StoreEngine<
|
||||
CP extends CompactionPolicy, C extends Compactor, SFM extends StoreFileManager> {
|
||||
protected final Store store;
|
||||
protected final Configuration conf;
|
||||
protected final KVComparator comparator;
|
||||
|
||||
private final PP<CompactionPolicy> compactionPolicy = new PP<CompactionPolicy>();
|
||||
private final PP<Compactor> compactor = new PP<Compactor>();
|
||||
private final PP<StoreFileManager> storeFileManager = new PP<StoreFileManager>();
|
||||
protected CP compactionPolicy;
|
||||
protected C compactor;
|
||||
protected SFM storeFileManager;
|
||||
private boolean isInitialized = false;
|
||||
|
||||
/**
|
||||
|
@ -50,7 +54,7 @@ public abstract class StoreEngine {
|
|||
*/
|
||||
public static final String STORE_ENGINE_CLASS_KEY = "hbase.hstore.engine.class";
|
||||
|
||||
private static final Class<? extends StoreEngine>
|
||||
private static final Class<? extends StoreEngine<?, ?, ?>>
|
||||
DEFAULT_STORE_ENGINE_CLASS = DefaultStoreEngine.class;
|
||||
|
||||
/**
|
||||
|
@ -58,7 +62,7 @@ public abstract class StoreEngine {
|
|||
*/
|
||||
public CompactionPolicy getCompactionPolicy() {
|
||||
createComponentsOnce();
|
||||
return this.compactionPolicy.get();
|
||||
return this.compactionPolicy;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -66,7 +70,7 @@ public abstract class StoreEngine {
|
|||
*/
|
||||
public Compactor getCompactor() {
|
||||
createComponentsOnce();
|
||||
return this.compactor.get();
|
||||
return this.compactor;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -74,7 +78,7 @@ public abstract class StoreEngine {
|
|||
*/
|
||||
public StoreFileManager getStoreFileManager() {
|
||||
createComponentsOnce();
|
||||
return this.storeFileManager.get();
|
||||
return this.storeFileManager;
|
||||
}
|
||||
|
||||
protected StoreEngine(Configuration conf, Store store, KVComparator comparator) {
|
||||
|
@ -83,18 +87,22 @@ public abstract class StoreEngine {
|
|||
this.comparator = comparator;
|
||||
}
|
||||
|
||||
public CompactionContext createCompaction() {
|
||||
createComponentsOnce();
|
||||
return this.createCompactionContext();
|
||||
}
|
||||
|
||||
protected abstract CompactionContext createCompactionContext();
|
||||
|
||||
/**
|
||||
* Create the StoreEngine's components.
|
||||
* @param storeFileManager out parameter for StoreFileManager.
|
||||
* @param compactionPolicy out parameter for CompactionPolicy.
|
||||
* @param compactor out parameter for Compactor.
|
||||
*/
|
||||
protected abstract void createComponents(PP<StoreFileManager> storeFileManager,
|
||||
PP<CompactionPolicy> compactionPolicy, PP<Compactor> compactor);
|
||||
protected abstract void createComponents();
|
||||
|
||||
private void createComponentsOnce() {
|
||||
if (isInitialized) return;
|
||||
createComponents(storeFileManager, compactionPolicy, compactor);
|
||||
createComponents();
|
||||
assert compactor != null && compactionPolicy != null && storeFileManager != null;
|
||||
isInitialized = true;
|
||||
}
|
||||
|
||||
|
@ -117,18 +125,4 @@ public abstract class StoreEngine {
|
|||
throw new IOException("Unable to load configured store engine '" + className + "'", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* To allow StoreEngine-s to have custom dependencies between 3 components, we want to create
|
||||
* them in one place. To return multiple, simulate C++ pointer to pointers/C# out params.
|
||||
*/
|
||||
protected static class PP<T> {
|
||||
private T t = null;
|
||||
public void set(T t) {
|
||||
this.t = t;
|
||||
}
|
||||
public T get() {
|
||||
return this.t;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,7 +39,7 @@ import com.google.common.collect.ImmutableCollection;
|
|||
* Implementations are assumed to be not thread safe.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
interface StoreFileManager {
|
||||
public interface StoreFileManager {
|
||||
/**
|
||||
* Loads the initial store files into empty StoreFileManager.
|
||||
* @param storeFiles The files to load.
|
||||
|
@ -56,7 +56,6 @@ interface StoreFileManager {
|
|||
* Adds compaction results into the structure.
|
||||
* @param compactedFiles The input files for the compaction.
|
||||
* @param results The resulting files for the compaction.
|
||||
* @return The files that can be removed from storage. Usually,
|
||||
*/
|
||||
public abstract void addCompactionResults(
|
||||
Collection<StoreFile> compactedFiles, Collection<StoreFile> results);
|
||||
|
|
|
@ -1,79 +0,0 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver.compactions;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public class CompactSelection {
|
||||
private static final long serialVersionUID = 1L;
|
||||
static final Log LOG = LogFactory.getLog(CompactSelection.class);
|
||||
// the actual list - this is needed to handle methods like "sublist" correctly
|
||||
List<StoreFile> filesToCompact = new ArrayList<StoreFile>();
|
||||
// was this compaction promoted to an off-peak
|
||||
boolean isOffPeakCompaction = false;
|
||||
// CompactSelection object creation time.
|
||||
private final long selectionTime;
|
||||
|
||||
public CompactSelection(List<StoreFile> filesToCompact) {
|
||||
this.selectionTime = EnvironmentEdgeManager.currentTimeMillis();
|
||||
this.filesToCompact = filesToCompact;
|
||||
this.isOffPeakCompaction = false;
|
||||
}
|
||||
|
||||
public List<StoreFile> getFilesToCompact() {
|
||||
return filesToCompact;
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes all files from the current compaction list, and resets off peak
|
||||
* compactions is set.
|
||||
*/
|
||||
public void emptyFileList() {
|
||||
filesToCompact.clear();
|
||||
}
|
||||
|
||||
public boolean isOffPeakCompaction() {
|
||||
return this.isOffPeakCompaction;
|
||||
}
|
||||
|
||||
public void setOffPeak(boolean value) {
|
||||
this.isOffPeakCompaction = value;
|
||||
}
|
||||
|
||||
public long getSelectionTime() {
|
||||
return selectionTime;
|
||||
}
|
||||
|
||||
public CompactSelection getSubList(int start, int end) {
|
||||
filesToCompact = filesToCompact.subList(start, end);
|
||||
return this;
|
||||
}
|
||||
|
||||
public void clearSubList(int start, int end) {
|
||||
filesToCompact.subList(start, end).clear();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,80 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver.compactions;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||
|
||||
|
||||
/**
|
||||
* This class holds all "physical" details necessary to run a compaction.
|
||||
* It also has compaction request with all the logical details.
|
||||
* Hence, this class is basically the compaction.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public abstract class CompactionContext {
|
||||
protected CompactionRequest request = null;
|
||||
|
||||
/**
|
||||
* Called before coprocessor preCompactSelection and should filter the candidates
|
||||
* for coprocessor; i.e. exclude the files that definitely cannot be compacted at this time.
|
||||
* @param filesCompacting files currently compacting
|
||||
* @return the list of files that can theoretically be compacted.
|
||||
*/
|
||||
public abstract List<StoreFile> preSelect(final List<StoreFile> filesCompacting);
|
||||
|
||||
/**
|
||||
* Called to select files for compaction. Must fill in the request field if successful.
|
||||
* @param filesCompacting Files currently being compacted by other compactions.
|
||||
* @param isUserCompaction Whether this is a user compaction.
|
||||
* @param mayUseOffPeak Whether the underlying policy may assume it's off-peak hours.
|
||||
* @param forceMajor Whether to force major compaction.
|
||||
* @return Whether the selection succeeded. Selection may be empty and lead to no compaction.
|
||||
*/
|
||||
public abstract boolean select(
|
||||
final List<StoreFile> filesCompacting, final boolean isUserCompaction,
|
||||
final boolean mayUseOffPeak, final boolean forceMajor) throws IOException;
|
||||
|
||||
/**
|
||||
* Forces external selection to be applied for this compaction.
|
||||
* @param request The pre-cooked request with selection and other settings.
|
||||
*/
|
||||
public void forceSelect(CompactionRequest request) {
|
||||
this.request = request;
|
||||
}
|
||||
|
||||
/**
|
||||
* Runs the compaction based on current selection. select/forceSelect must have been called.
|
||||
* @return The new file paths resulting from compaction.
|
||||
*/
|
||||
public abstract List<Path> compact() throws IOException;
|
||||
|
||||
public CompactionRequest getRequest() {
|
||||
assert hasSelection();
|
||||
return this.request;
|
||||
}
|
||||
|
||||
public boolean hasSelection() {
|
||||
return this.request != null;
|
||||
}
|
||||
}
|
|
@ -42,25 +42,6 @@ public abstract class CompactionPolicy {
|
|||
this.comConf = new CompactionConfiguration(conf, this.storeConfigInfo);
|
||||
}
|
||||
|
||||
/**
|
||||
* This is called before coprocessor preCompactSelection and should filter the candidates
|
||||
* for coprocessor; i.e. exclude the files that definitely cannot be compacted at this time.
|
||||
* @param candidateFiles candidate files, ordered from oldest to newest
|
||||
* @param filesCompacting files currently compacting
|
||||
* @return the list of files that can theoretically be compacted.
|
||||
*/
|
||||
public abstract List<StoreFile> preSelectCompaction(
|
||||
List<StoreFile> candidateFiles, final List<StoreFile> filesCompacting);
|
||||
|
||||
/**
|
||||
* @param candidateFiles candidate files, ordered from oldest to newest
|
||||
* @return subset copy of candidate list that meets compaction criteria
|
||||
* @throws java.io.IOException
|
||||
*/
|
||||
public abstract CompactSelection selectCompaction(
|
||||
final List<StoreFile> candidateFiles, final boolean isUserCompaction,
|
||||
final boolean mayUseOffPeak, final boolean forceMajor) throws IOException;
|
||||
|
||||
/**
|
||||
* @param storeFiles Store files in the store.
|
||||
* @return The system compaction priority of the store, based on storeFiles.
|
||||
|
|
|
@ -18,22 +18,13 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver.compactions;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.RejectedExecutionHandler;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||
import org.apache.hadoop.hbase.regionserver.Store;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
|
@ -46,231 +37,193 @@ import com.google.common.base.Predicate;
|
|||
import com.google.common.collect.Collections2;
|
||||
|
||||
/**
|
||||
* This class holds all details necessary to run a compaction.
|
||||
* This class holds all logical details necessary to run a compaction.
|
||||
*/
|
||||
@InterfaceAudience.LimitedPrivate({ "coprocessor" })
|
||||
@InterfaceStability.Evolving
|
||||
public class CompactionRequest implements Comparable<CompactionRequest>,
|
||||
Runnable {
|
||||
static final Log LOG = LogFactory.getLog(CompactionRequest.class);
|
||||
private final HRegion region;
|
||||
private final HStore store;
|
||||
private CompactSelection compactSelection;
|
||||
private long totalSize;
|
||||
private boolean isMajor;
|
||||
private int priority;
|
||||
private final Long timeInNanos;
|
||||
private HRegionServer server = null;
|
||||
public class CompactionRequest implements Comparable<CompactionRequest> {
|
||||
static final Log LOG = LogFactory.getLog(CompactionRequest.class);
|
||||
// was this compaction promoted to an off-peak
|
||||
private boolean isOffPeak = false;
|
||||
private boolean isMajor = false;
|
||||
private int priority = Store.NO_PRIORITY;
|
||||
private Collection<StoreFile> filesToCompact;
|
||||
|
||||
public static CompactionRequest getRequestForTesting(Collection<StoreFile> selection,
|
||||
boolean isMajor) {
|
||||
return new CompactionRequest(null, null, new CompactSelection(new ArrayList<StoreFile>(
|
||||
selection)), isMajor, 0, System.nanoTime());
|
||||
// CompactRequest object creation time.
|
||||
private long selectionTime;
|
||||
// System time used to compare objects in FIFO order. TODO: maybe use selectionTime?
|
||||
private Long timeInNanos;
|
||||
private String regionName = "";
|
||||
private String storeName = "";
|
||||
private long totalSize = -1L;
|
||||
|
||||
/**
|
||||
* This ctor should be used by coprocessors that want to subclass CompactionRequest.
|
||||
*/
|
||||
public CompactionRequest() {
|
||||
this.selectionTime = EnvironmentEdgeManager.currentTimeMillis();
|
||||
this.timeInNanos = System.nanoTime();
|
||||
}
|
||||
|
||||
public CompactionRequest(Collection<StoreFile> files) {
|
||||
this();
|
||||
Preconditions.checkNotNull(files);
|
||||
this.filesToCompact = files;
|
||||
recalculateSize();
|
||||
}
|
||||
|
||||
/**
|
||||
* Called before compaction is executed by CompactSplitThread; for use by coproc subclasses.
|
||||
*/
|
||||
public void beforeExecute() {}
|
||||
|
||||
/**
|
||||
* Called after compaction is executed by CompactSplitThread; for use by coproc subclasses.
|
||||
*/
|
||||
public void afterExecute() {}
|
||||
|
||||
/**
|
||||
* Combines the request with other request. Coprocessors subclassing CR may override
|
||||
* this if they want to do clever things based on CompactionPolicy selection that
|
||||
* is passed to this method via "other". The default implementation just does a copy.
|
||||
* @param other Request to combine with.
|
||||
* @return The result (may be "this" or "other").
|
||||
*/
|
||||
public CompactionRequest combineWith(CompactionRequest other) {
|
||||
this.filesToCompact = new ArrayList<StoreFile>(other.getFiles());
|
||||
this.isOffPeak = other.isOffPeak;
|
||||
this.isMajor = other.isMajor;
|
||||
this.priority = other.priority;
|
||||
this.selectionTime = other.selectionTime;
|
||||
this.timeInNanos = other.timeInNanos;
|
||||
this.regionName = other.regionName;
|
||||
this.storeName = other.storeName;
|
||||
this.totalSize = other.totalSize;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* This function will define where in the priority queue the request will
|
||||
* end up. Those with the highest priorities will be first. When the
|
||||
* priorities are the same it will first compare priority then date
|
||||
* to maintain a FIFO functionality.
|
||||
*
|
||||
* <p>Note: The date is only accurate to the millisecond which means it is
|
||||
* possible that two requests were inserted into the queue within a
|
||||
* millisecond. When that is the case this function will break the tie
|
||||
* arbitrarily.
|
||||
*/
|
||||
@Override
|
||||
public int compareTo(CompactionRequest request) {
|
||||
//NOTE: The head of the priority queue is the least element
|
||||
if (this.equals(request)) {
|
||||
return 0; //they are the same request
|
||||
}
|
||||
int compareVal;
|
||||
|
||||
compareVal = priority - request.priority; //compare priority
|
||||
if (compareVal != 0) {
|
||||
return compareVal;
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor for a custom compaction. Uses the setXXX methods to update the state of the
|
||||
* compaction before being used.
|
||||
*/
|
||||
public CompactionRequest(HRegion region, HStore store, int priority) {
|
||||
this(region, store, null, false, priority, System
|
||||
.nanoTime());
|
||||
compareVal = timeInNanos.compareTo(request.timeInNanos);
|
||||
if (compareVal != 0) {
|
||||
return compareVal;
|
||||
}
|
||||
|
||||
public CompactionRequest(HRegion r, HStore s, CompactSelection files, boolean isMajor, int p) {
|
||||
// delegate to the internal constructor after checking basic preconditions
|
||||
this(Preconditions.checkNotNull(r), s, Preconditions.checkNotNull(files), isMajor, p, System
|
||||
.nanoTime());
|
||||
}
|
||||
// break the tie based on hash code
|
||||
return this.hashCode() - request.hashCode();
|
||||
}
|
||||
|
||||
private CompactionRequest(HRegion region, HStore store, CompactSelection files, boolean isMajor,
|
||||
int priority, long startTime) {
|
||||
this.region = region;
|
||||
this.store = store;
|
||||
this.isMajor = isMajor;
|
||||
this.priority = priority;
|
||||
this.timeInNanos = startTime;
|
||||
if (files != null) {
|
||||
this.setSelection(files);
|
||||
}
|
||||
}
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
return (this == obj);
|
||||
}
|
||||
|
||||
/**
|
||||
* This function will define where in the priority queue the request will
|
||||
* end up. Those with the highest priorities will be first. When the
|
||||
* priorities are the same it will first compare priority then date
|
||||
* to maintain a FIFO functionality.
|
||||
*
|
||||
* <p>Note: The date is only accurate to the millisecond which means it is
|
||||
* possible that two requests were inserted into the queue within a
|
||||
* millisecond. When that is the case this function will break the tie
|
||||
* arbitrarily.
|
||||
*/
|
||||
@Override
|
||||
public int compareTo(CompactionRequest request) {
|
||||
//NOTE: The head of the priority queue is the least element
|
||||
if (this.equals(request)) {
|
||||
return 0; //they are the same request
|
||||
}
|
||||
int compareVal;
|
||||
public Collection<StoreFile> getFiles() {
|
||||
return this.filesToCompact;
|
||||
}
|
||||
|
||||
compareVal = priority - request.priority; //compare priority
|
||||
if (compareVal != 0) {
|
||||
return compareVal;
|
||||
}
|
||||
/**
|
||||
* Sets the region/store name, for logging.
|
||||
*/
|
||||
public void setDescription(String regionName, String storeName) {
|
||||
this.regionName = regionName;
|
||||
this.storeName = storeName;
|
||||
}
|
||||
|
||||
compareVal = timeInNanos.compareTo(request.timeInNanos);
|
||||
if (compareVal != 0) {
|
||||
return compareVal;
|
||||
}
|
||||
/** Gets the total size of all StoreFiles in compaction */
|
||||
public long getSize() {
|
||||
return totalSize;
|
||||
}
|
||||
|
||||
// break the tie based on hash code
|
||||
return this.hashCode() - request.hashCode();
|
||||
}
|
||||
public boolean isMajor() {
|
||||
return this.isMajor;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
return (this == obj);
|
||||
}
|
||||
/** Gets the priority for the request */
|
||||
public int getPriority() {
|
||||
return priority;
|
||||
}
|
||||
|
||||
/** Gets the HRegion for the request */
|
||||
public HRegion getHRegion() {
|
||||
return region;
|
||||
}
|
||||
/** Sets the priority for the request */
|
||||
public void setPriority(int p) {
|
||||
this.priority = p;
|
||||
}
|
||||
|
||||
/** Gets the Store for the request */
|
||||
public HStore getStore() {
|
||||
return store;
|
||||
}
|
||||
public boolean isOffPeak() {
|
||||
return this.isOffPeak;
|
||||
}
|
||||
|
||||
/** Gets the compact selection object for the request */
|
||||
public CompactSelection getCompactSelection() {
|
||||
return compactSelection;
|
||||
}
|
||||
public void setOffPeak(boolean value) {
|
||||
this.isOffPeak = value;
|
||||
}
|
||||
|
||||
/** Gets the StoreFiles for the request */
|
||||
public List<StoreFile> getFiles() {
|
||||
return compactSelection.getFilesToCompact();
|
||||
}
|
||||
public long getSelectionTime() {
|
||||
return this.selectionTime;
|
||||
}
|
||||
|
||||
/** Gets the total size of all StoreFiles in compaction */
|
||||
public long getSize() {
|
||||
return totalSize;
|
||||
}
|
||||
/**
|
||||
* Specify if this compaction should be a major compaction based on the state of the store
|
||||
* @param isMajor <tt>true</tt> if the system determines that this compaction should be a major
|
||||
* compaction
|
||||
*/
|
||||
public void setIsMajor(boolean isMajor) {
|
||||
this.isMajor = isMajor;
|
||||
}
|
||||
|
||||
public boolean isMajor() {
|
||||
return this.isMajor;
|
||||
}
|
||||
|
||||
/** Gets the priority for the request */
|
||||
public int getPriority() {
|
||||
return priority;
|
||||
}
|
||||
|
||||
public long getSelectionTime() {
|
||||
return compactSelection.getSelectionTime();
|
||||
}
|
||||
|
||||
/** Gets the priority for the request */
|
||||
public void setPriority(int p) {
|
||||
this.priority = p;
|
||||
}
|
||||
|
||||
public void setServer(HRegionServer hrs) {
|
||||
this.server = hrs;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the files (and, implicitly, the size of the compaction based on those files)
|
||||
* @param files files that should be included in the compaction
|
||||
*/
|
||||
public void setSelection(CompactSelection files) {
|
||||
long sz = 0;
|
||||
for (StoreFile sf : files.getFilesToCompact()) {
|
||||
sz += sf.getReader().length();
|
||||
}
|
||||
this.totalSize = sz;
|
||||
this.compactSelection = files;
|
||||
}
|
||||
|
||||
/**
|
||||
* Specify if this compaction should be a major compaction based on the state of the store
|
||||
* @param isMajor <tt>true</tt> if the system determines that this compaction should be a major
|
||||
* compaction
|
||||
*/
|
||||
public void setIsMajor(boolean isMajor) {
|
||||
this.isMajor = isMajor;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
String fsList = Joiner.on(", ").join(
|
||||
Collections2.transform(Collections2.filter(
|
||||
compactSelection.getFilesToCompact(),
|
||||
new Predicate<StoreFile>() {
|
||||
public boolean apply(StoreFile sf) {
|
||||
return sf.getReader() != null;
|
||||
}
|
||||
}), new Function<StoreFile, String>() {
|
||||
public String apply(StoreFile sf) {
|
||||
return StringUtils.humanReadableInt(sf.getReader().length());
|
||||
@Override
|
||||
public String toString() {
|
||||
String fsList = Joiner.on(", ").join(
|
||||
Collections2.transform(Collections2.filter(
|
||||
this.getFiles(),
|
||||
new Predicate<StoreFile>() {
|
||||
public boolean apply(StoreFile sf) {
|
||||
return sf.getReader() != null;
|
||||
}
|
||||
}));
|
||||
|
||||
return "regionName=" + region.getRegionNameAsString() +
|
||||
", storeName=" + new String(store.getFamily().getName()) +
|
||||
", fileCount=" + compactSelection.getFilesToCompact().size() +
|
||||
", fileSize=" + StringUtils.humanReadableInt(totalSize) +
|
||||
((fsList.isEmpty()) ? "" : " (" + fsList + ")") +
|
||||
", priority=" + priority + ", time=" + timeInNanos;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
Preconditions.checkNotNull(server);
|
||||
if (server.isStopped()) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
long start = EnvironmentEdgeManager.currentTimeMillis();
|
||||
boolean completed = region.compact(this);
|
||||
long now = EnvironmentEdgeManager.currentTimeMillis();
|
||||
LOG.info(((completed) ? "completed" : "aborted") + " compaction: " +
|
||||
this + "; duration=" + StringUtils.formatTimeDiff(now, start));
|
||||
if (completed) {
|
||||
// degenerate case: blocked regions require recursive enqueues
|
||||
if (store.getCompactPriority() <= 0) {
|
||||
server.compactSplitThread.requestCompaction(region, store, "Recursive enqueue", null);
|
||||
} else {
|
||||
// see if the compaction has caused us to exceed max region size
|
||||
server.getCompactSplitThread().requestSplit(region);
|
||||
}), new Function<StoreFile, String>() {
|
||||
public String apply(StoreFile sf) {
|
||||
return StringUtils.humanReadableInt(sf.getReader().length());
|
||||
}
|
||||
}
|
||||
} catch (IOException ex) {
|
||||
LOG.error("Compaction failed " + this, RemoteExceptionHandler
|
||||
.checkIOException(ex));
|
||||
server.checkFileSystem();
|
||||
} catch (Exception ex) {
|
||||
LOG.error("Compaction failed " + this, ex);
|
||||
server.checkFileSystem();
|
||||
} finally {
|
||||
store.finishRequest(this);
|
||||
LOG.debug("CompactSplitThread Status: " + server.compactSplitThread);
|
||||
}
|
||||
}
|
||||
}));
|
||||
|
||||
/**
|
||||
* Cleanup class to use when rejecting a compaction request from the queue.
|
||||
*/
|
||||
public static class Rejection implements RejectedExecutionHandler {
|
||||
return "regionName=" + regionName + ", storeName=" + storeName +
|
||||
", fileCount=" + this.getFiles().size() +
|
||||
", fileSize=" + StringUtils.humanReadableInt(totalSize) +
|
||||
((fsList.isEmpty()) ? "" : " (" + fsList + ")") +
|
||||
", priority=" + priority + ", time=" + timeInNanos;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void rejectedExecution(Runnable request, ThreadPoolExecutor pool) {
|
||||
if (request instanceof CompactionRequest) {
|
||||
CompactionRequest cr = (CompactionRequest) request;
|
||||
LOG.debug("Compaction Rejected: " + cr);
|
||||
cr.getStore().finishRequest(cr);
|
||||
}
|
||||
}
|
||||
/**
|
||||
* Recalculate the size of the compaction based on current files.
|
||||
* @param files files that should be included in the compaction
|
||||
*/
|
||||
private void recalculateSize() {
|
||||
long sz = 0;
|
||||
for (StoreFile sf : this.filesToCompact) {
|
||||
sz += sf.getReader().length();
|
||||
}
|
||||
this.totalSize = sz;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -60,7 +60,9 @@ public abstract class Compactor {
|
|||
*/
|
||||
public List<Path> compactForTesting(final Collection<StoreFile> filesToCompact, boolean isMajor)
|
||||
throws IOException {
|
||||
return compact(CompactionRequest.getRequestForTesting(filesToCompact, isMajor));
|
||||
CompactionRequest cr = new CompactionRequest(filesToCompact);
|
||||
cr.setIsMajor(isMajor);
|
||||
return this.compact(cr);
|
||||
}
|
||||
|
||||
public CompactionProgress getProgress() {
|
||||
|
|
|
@ -32,9 +32,11 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
|
||||
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFileManager;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreUtils;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
|
||||
|
@ -49,16 +51,15 @@ import com.google.common.collect.Collections2;
|
|||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class DefaultCompactionPolicy extends CompactionPolicy {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(DefaultCompactionPolicy.class);
|
||||
|
||||
public DefaultCompactionPolicy(Configuration conf, StoreConfigInformation storeConfigInfo) {
|
||||
super(conf, storeConfigInfo);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<StoreFile> preSelectCompaction(
|
||||
List<StoreFile> candidateFiles, final List<StoreFile> filesCompacting) {
|
||||
|
||||
private ArrayList<StoreFile> getCurrentEligibleFiles(
|
||||
ArrayList<StoreFile> candidateFiles, final List<StoreFile> filesCompacting) {
|
||||
// candidates = all storefiles not already in compaction queue
|
||||
if (!filesCompacting.isEmpty()) {
|
||||
// exclude all files older than the newest file we're currently
|
||||
|
@ -71,6 +72,11 @@ public class DefaultCompactionPolicy extends CompactionPolicy {
|
|||
return candidateFiles;
|
||||
}
|
||||
|
||||
public List<StoreFile> preSelectCompactionForCoprocessor(
|
||||
final Collection<StoreFile> candidates, final List<StoreFile> filesCompacting) {
|
||||
return getCurrentEligibleFiles(new ArrayList<StoreFile>(candidates), filesCompacting);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getSystemCompactionPriority(final Collection<StoreFile> storeFiles) {
|
||||
return this.comConf.getBlockingStorefileCount() - storeFiles.size();
|
||||
|
@ -81,20 +87,20 @@ public class DefaultCompactionPolicy extends CompactionPolicy {
|
|||
* @return subset copy of candidate list that meets compaction criteria
|
||||
* @throws java.io.IOException
|
||||
*/
|
||||
@Override
|
||||
public CompactSelection selectCompaction(List<StoreFile> candidateFiles,
|
||||
final boolean isUserCompaction, final boolean mayUseOffPeak, final boolean forceMajor)
|
||||
throws IOException {
|
||||
public CompactionRequest selectCompaction(Collection<StoreFile> candidateFiles,
|
||||
final List<StoreFile> filesCompacting, final boolean isUserCompaction,
|
||||
final boolean mayUseOffPeak, final boolean forceMajor) throws IOException {
|
||||
// Preliminary compaction subject to filters
|
||||
CompactSelection candidateSelection = new CompactSelection(candidateFiles);
|
||||
ArrayList<StoreFile> candidateSelection = new ArrayList<StoreFile>(candidateFiles);
|
||||
candidateSelection = getCurrentEligibleFiles(candidateSelection, filesCompacting);
|
||||
long cfTtl = this.storeConfigInfo.getStoreFileTtl();
|
||||
if (!forceMajor) {
|
||||
// If there are expired files, only select them so that compaction deletes them
|
||||
if (comConf.shouldDeleteExpired() && (cfTtl != Long.MAX_VALUE)) {
|
||||
CompactSelection expiredSelection = selectExpiredStoreFiles(
|
||||
candidateSelection, EnvironmentEdgeManager.currentTimeMillis() - cfTtl);
|
||||
ArrayList<StoreFile> expiredSelection = selectExpiredStoreFiles(
|
||||
candidateSelection, EnvironmentEdgeManager.currentTimeMillis() - cfTtl);
|
||||
if (expiredSelection != null) {
|
||||
return expiredSelection;
|
||||
return new CompactionRequest(expiredSelection);
|
||||
}
|
||||
}
|
||||
candidateSelection = skipLargeFiles(candidateSelection);
|
||||
|
@ -106,21 +112,23 @@ public class DefaultCompactionPolicy extends CompactionPolicy {
|
|||
// Or, if there are any references among the candidates.
|
||||
boolean majorCompaction = (
|
||||
(forceMajor && isUserCompaction)
|
||||
|| ((forceMajor || isMajorCompaction(candidateSelection.getFilesToCompact()))
|
||||
&& (candidateSelection.getFilesToCompact().size() < comConf.getMaxFilesToCompact()))
|
||||
|| StoreUtils.hasReferences(candidateSelection.getFilesToCompact())
|
||||
|| ((forceMajor || isMajorCompaction(candidateSelection))
|
||||
&& (candidateSelection.size() < comConf.getMaxFilesToCompact()))
|
||||
|| StoreUtils.hasReferences(candidateSelection)
|
||||
);
|
||||
|
||||
if (!majorCompaction) {
|
||||
// we're doing a minor compaction, let's see what files are applicable
|
||||
candidateSelection.setOffPeak(mayUseOffPeak);
|
||||
candidateSelection = filterBulk(candidateSelection);
|
||||
candidateSelection = applyCompactionPolicy(candidateSelection);
|
||||
candidateSelection = applyCompactionPolicy(candidateSelection, mayUseOffPeak);
|
||||
candidateSelection = checkMinFilesCriteria(candidateSelection);
|
||||
}
|
||||
candidateSelection =
|
||||
removeExcessFiles(candidateSelection, isUserCompaction, majorCompaction);
|
||||
return candidateSelection;
|
||||
candidateSelection = removeExcessFiles(candidateSelection, isUserCompaction, majorCompaction);
|
||||
CompactionRequest result = new CompactionRequest(candidateSelection);
|
||||
if (!majorCompaction && !candidateSelection.isEmpty()) {
|
||||
result.setOffPeak(mayUseOffPeak);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -133,33 +141,25 @@ public class DefaultCompactionPolicy extends CompactionPolicy {
|
|||
* @return A CompactSelection contains the expired store files as
|
||||
* filesToCompact
|
||||
*/
|
||||
private CompactSelection selectExpiredStoreFiles(
|
||||
CompactSelection candidates, long maxExpiredTimeStamp) {
|
||||
List<StoreFile> filesToCompact = candidates.getFilesToCompact();
|
||||
if (filesToCompact == null || filesToCompact.size() == 0)
|
||||
return null;
|
||||
private ArrayList<StoreFile> selectExpiredStoreFiles(
|
||||
ArrayList<StoreFile> candidates, long maxExpiredTimeStamp) {
|
||||
if (candidates == null || candidates.size() == 0) return null;
|
||||
ArrayList<StoreFile> expiredStoreFiles = null;
|
||||
boolean hasExpiredStoreFiles = false;
|
||||
CompactSelection expiredSFSelection = null;
|
||||
|
||||
for (StoreFile storeFile : filesToCompact) {
|
||||
for (StoreFile storeFile : candidates) {
|
||||
if (storeFile.getReader().getMaxTimestamp() < maxExpiredTimeStamp) {
|
||||
LOG.info("Deleting the expired store file by compaction: "
|
||||
+ storeFile.getPath() + " whose maxTimeStamp is "
|
||||
+ storeFile.getReader().getMaxTimestamp()
|
||||
+ " while the max expired timestamp is " + maxExpiredTimeStamp);
|
||||
if (!hasExpiredStoreFiles) {
|
||||
if (expiredStoreFiles == null) {
|
||||
expiredStoreFiles = new ArrayList<StoreFile>();
|
||||
hasExpiredStoreFiles = true;
|
||||
}
|
||||
expiredStoreFiles.add(storeFile);
|
||||
}
|
||||
}
|
||||
|
||||
if (hasExpiredStoreFiles) {
|
||||
expiredSFSelection = new CompactSelection(expiredStoreFiles);
|
||||
}
|
||||
return expiredSFSelection;
|
||||
return expiredStoreFiles;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -168,18 +168,16 @@ public class DefaultCompactionPolicy extends CompactionPolicy {
|
|||
* exclude all files above maxCompactSize
|
||||
* Also save all references. We MUST compact them
|
||||
*/
|
||||
private CompactSelection skipLargeFiles(CompactSelection candidates) {
|
||||
private ArrayList<StoreFile> skipLargeFiles(ArrayList<StoreFile> candidates) {
|
||||
int pos = 0;
|
||||
while (pos < candidates.getFilesToCompact().size() &&
|
||||
candidates.getFilesToCompact().get(pos).getReader().length() >
|
||||
comConf.getMaxCompactSize() &&
|
||||
!candidates.getFilesToCompact().get(pos).isReference()) {
|
||||
while (pos < candidates.size() && !candidates.get(pos).isReference()
|
||||
&& (candidates.get(pos).getReader().length() > comConf.getMaxCompactSize())) {
|
||||
++pos;
|
||||
}
|
||||
if (pos > 0) {
|
||||
LOG.debug("Some files are too large. Excluding " + pos
|
||||
+ " files from compaction candidates");
|
||||
candidates.clearSubList(0, pos);
|
||||
candidates.subList(0, pos).clear();
|
||||
}
|
||||
return candidates;
|
||||
}
|
||||
|
@ -189,9 +187,8 @@ public class DefaultCompactionPolicy extends CompactionPolicy {
|
|||
* @return filtered subset
|
||||
* exclude all bulk load files if configured
|
||||
*/
|
||||
private CompactSelection filterBulk(CompactSelection candidates) {
|
||||
candidates.getFilesToCompact().removeAll(Collections2.filter(
|
||||
candidates.getFilesToCompact(),
|
||||
private ArrayList<StoreFile> filterBulk(ArrayList<StoreFile> candidates) {
|
||||
candidates.removeAll(Collections2.filter(candidates,
|
||||
new Predicate<StoreFile>() {
|
||||
@Override
|
||||
public boolean apply(StoreFile input) {
|
||||
|
@ -206,9 +203,9 @@ public class DefaultCompactionPolicy extends CompactionPolicy {
|
|||
* @return filtered subset
|
||||
* take upto maxFilesToCompact from the start
|
||||
*/
|
||||
private CompactSelection removeExcessFiles(CompactSelection candidates,
|
||||
private ArrayList<StoreFile> removeExcessFiles(ArrayList<StoreFile> candidates,
|
||||
boolean isUserCompaction, boolean isMajorCompaction) {
|
||||
int excess = candidates.getFilesToCompact().size() - comConf.getMaxFilesToCompact();
|
||||
int excess = candidates.size() - comConf.getMaxFilesToCompact();
|
||||
if (excess > 0) {
|
||||
if (isMajorCompaction && isUserCompaction) {
|
||||
LOG.debug("Warning, compacting more than " + comConf.getMaxFilesToCompact() +
|
||||
|
@ -216,8 +213,7 @@ public class DefaultCompactionPolicy extends CompactionPolicy {
|
|||
} else {
|
||||
LOG.debug("Too many admissible files. Excluding " + excess
|
||||
+ " files from compaction candidates");
|
||||
candidates.clearSubList(comConf.getMaxFilesToCompact(),
|
||||
candidates.getFilesToCompact().size());
|
||||
candidates.subList(comConf.getMaxFilesToCompact(), candidates.size()).clear();
|
||||
}
|
||||
}
|
||||
return candidates;
|
||||
|
@ -227,16 +223,14 @@ public class DefaultCompactionPolicy extends CompactionPolicy {
|
|||
* @return filtered subset
|
||||
* forget the compactionSelection if we don't have enough files
|
||||
*/
|
||||
private CompactSelection checkMinFilesCriteria(CompactSelection candidates) {
|
||||
private ArrayList<StoreFile> checkMinFilesCriteria(ArrayList<StoreFile> candidates) {
|
||||
int minFiles = comConf.getMinFilesToCompact();
|
||||
if (candidates.getFilesToCompact().size() < minFiles) {
|
||||
if (candidates.size() < minFiles) {
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("Not compacting files because we only have " +
|
||||
candidates.getFilesToCompact().size() +
|
||||
" files ready for compaction. Need " + minFiles + " to initiate.");
|
||||
LOG.debug("Not compacting files because we only have " + candidates.size() +
|
||||
" files ready for compaction. Need " + minFiles + " to initiate.");
|
||||
}
|
||||
candidates.emptyFileList();
|
||||
candidates.setOffPeak(false);
|
||||
candidates.clear();
|
||||
}
|
||||
return candidates;
|
||||
}
|
||||
|
@ -271,25 +265,26 @@ public class DefaultCompactionPolicy extends CompactionPolicy {
|
|||
* | | | | | | | | | | | |
|
||||
* | | | | | | | | | | | |
|
||||
*/
|
||||
CompactSelection applyCompactionPolicy(CompactSelection candidates) throws IOException {
|
||||
if (candidates.getFilesToCompact().isEmpty()) {
|
||||
ArrayList<StoreFile> applyCompactionPolicy(
|
||||
ArrayList<StoreFile> candidates, boolean mayUseOffPeak) throws IOException {
|
||||
if (candidates.isEmpty()) {
|
||||
return candidates;
|
||||
}
|
||||
|
||||
// we're doing a minor compaction, let's see what files are applicable
|
||||
int start = 0;
|
||||
double ratio = comConf.getCompactionRatio();
|
||||
if (candidates.isOffPeakCompaction()) {
|
||||
if (mayUseOffPeak) {
|
||||
ratio = comConf.getCompactionRatioOffPeak();
|
||||
LOG.info("Running an off-peak compaction, selection ratio = " + ratio);
|
||||
}
|
||||
|
||||
// get store file sizes for incremental compacting selection.
|
||||
int countOfFiles = candidates.getFilesToCompact().size();
|
||||
final int countOfFiles = candidates.size();
|
||||
long[] fileSizes = new long[countOfFiles];
|
||||
long[] sumSize = new long[countOfFiles];
|
||||
for (int i = countOfFiles - 1; i >= 0; --i) {
|
||||
StoreFile file = candidates.getFilesToCompact().get(i);
|
||||
StoreFile file = candidates.get(i);
|
||||
fileSizes[i] = file.getReader().length();
|
||||
// calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
|
||||
int tooFar = i + comConf.getMaxFilesToCompact() - 1;
|
||||
|
@ -309,8 +304,9 @@ public class DefaultCompactionPolicy extends CompactionPolicy {
|
|||
+ " files from " + countOfFiles + " candidates");
|
||||
}
|
||||
|
||||
candidates = candidates.getSubList(start, countOfFiles);
|
||||
|
||||
if (start > 0) {
|
||||
candidates.subList(0, start).clear();
|
||||
}
|
||||
return candidates;
|
||||
}
|
||||
|
||||
|
|
|
@ -594,7 +594,7 @@ public class TestCompaction extends HBaseTestCase {
|
|||
HStore store = (HStore) r.getStore(COLUMN_FAMILY);
|
||||
|
||||
Collection<StoreFile> storeFiles = store.getStorefiles();
|
||||
Compactor tool = store.compactor;
|
||||
Compactor tool = store.storeEngine.getCompactor();
|
||||
|
||||
List<Path> newFiles = tool.compactForTesting(storeFiles, false);
|
||||
|
||||
|
@ -611,7 +611,7 @@ public class TestCompaction extends HBaseTestCase {
|
|||
stream.close();
|
||||
|
||||
try {
|
||||
store.completeCompaction(storeFiles, origPath);
|
||||
((HStore)store).moveFileIntoPlace(origPath);
|
||||
} catch (Exception e) {
|
||||
// The complete compaction should fail and the corrupt file should remain
|
||||
// in the 'tmp' directory;
|
||||
|
@ -635,7 +635,7 @@ public class TestCompaction extends HBaseTestCase {
|
|||
}
|
||||
store.triggerMajorCompaction();
|
||||
|
||||
CompactionRequest request = store.requestCompaction(Store.NO_PRIORITY, null);
|
||||
CompactionRequest request = store.requestCompaction(Store.NO_PRIORITY, null).getRequest();
|
||||
assertNotNull("Expected to receive a compaction request", request);
|
||||
assertEquals(
|
||||
"System-requested major compaction should not occur if there are too many store files",
|
||||
|
@ -653,7 +653,7 @@ public class TestCompaction extends HBaseTestCase {
|
|||
createStoreFile(r);
|
||||
}
|
||||
store.triggerMajorCompaction();
|
||||
CompactionRequest request = store.requestCompaction(Store.PRIORITY_USER, null);
|
||||
CompactionRequest request = store.requestCompaction(Store.PRIORITY_USER, null).getRequest();
|
||||
assertNotNull("Expected to receive a compaction request", request);
|
||||
assertEquals(
|
||||
"User-requested major compaction should always occur, even if there are too many store files",
|
||||
|
@ -680,7 +680,7 @@ public class TestCompaction extends HBaseTestCase {
|
|||
}
|
||||
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
TrackableCompactionRequest request = new TrackableCompactionRequest(r, (HStore) store, latch);
|
||||
TrackableCompactionRequest request = new TrackableCompactionRequest(latch);
|
||||
thread.requestCompaction(r, store, "test custom comapction", Store.PRIORITY_USER, request);
|
||||
// wait for the latch to complete.
|
||||
latch.await();
|
||||
|
@ -698,16 +698,15 @@ public class TestCompaction extends HBaseTestCase {
|
|||
* Constructor for a custom compaction. Uses the setXXX methods to update the state of the
|
||||
* compaction before being used.
|
||||
*/
|
||||
public TrackableCompactionRequest(HRegion region, HStore store, CountDownLatch finished) {
|
||||
super(region, store, Store.PRIORITY_USER);
|
||||
public TrackableCompactionRequest(CountDownLatch finished) {
|
||||
super();
|
||||
this.done = finished;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
super.run();
|
||||
public void afterExecute() {
|
||||
super.afterExecute();
|
||||
this.done.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -39,7 +39,8 @@ import org.apache.hadoop.hbase.HTableDescriptor;
|
|||
import org.apache.hadoop.hbase.SmallTests;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactionPolicy;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -234,11 +235,11 @@ public class TestDefaultCompactSelection extends TestCase {
|
|||
throws IOException {
|
||||
store.forceMajor = forcemajor;
|
||||
//Test Default compactions
|
||||
CompactSelection result = store.compactionPolicy
|
||||
.selectCompaction(candidates, false, isOffPeak, forcemajor);
|
||||
List<StoreFile> actual = result.getFilesToCompact();
|
||||
CompactionRequest result = ((DefaultCompactionPolicy)store.compactionPolicy).selectCompaction(
|
||||
candidates, new ArrayList<StoreFile>(), false, isOffPeak, forcemajor);
|
||||
List<StoreFile> actual = new ArrayList<StoreFile>(result.getFiles());
|
||||
if (isOffPeak && !forcemajor) {
|
||||
assertTrue(result.isOffPeakCompaction());
|
||||
assertTrue(result.isOffPeak());
|
||||
}
|
||||
assertEquals(Arrays.toString(expected), Arrays.toString(getSizes(actual)));
|
||||
store.forceMajor = false;
|
||||
|
|
|
@ -56,11 +56,11 @@ import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
|||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.util.BloomFilterFactory;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
|
||||
|
@ -224,17 +224,19 @@ public class TestStore extends TestCase {
|
|||
// by the compaction.
|
||||
for (int i = 1; i <= storeFileNum; i++) {
|
||||
// verify the expired store file.
|
||||
CompactionRequest cr = this.store.requestCompaction();
|
||||
CompactionContext compaction = this.store.requestCompaction();
|
||||
CompactionRequest cr = compaction.getRequest();
|
||||
// the first is expired normally.
|
||||
// If not the first compaction, there is another empty store file,
|
||||
List<StoreFile> files = new ArrayList<StoreFile>(cr.getFiles());
|
||||
assertEquals(Math.min(i, 2), cr.getFiles().size());
|
||||
for (int j = 0; j < cr.getFiles().size(); j++) {
|
||||
assertTrue(cr.getFiles().get(j).getReader().getMaxTimestamp() < (edge
|
||||
for (int j = 0; j < files.size(); j++) {
|
||||
assertTrue(files.get(j).getReader().getMaxTimestamp() < (edge
|
||||
.currentTimeMillis() - this.store.getScanInfo().getTtl()));
|
||||
}
|
||||
// Verify that the expired store file is compacted to an empty store file.
|
||||
// Default compaction policy creates just one and only one compacted file.
|
||||
StoreFile compactedFile = this.store.compact(cr).get(0);
|
||||
StoreFile compactedFile = this.store.compact(compaction).get(0);
|
||||
// It is an empty store file.
|
||||
assertEquals(0, compactedFile.getReader().getEntries());
|
||||
|
||||
|
|
|
@ -53,7 +53,7 @@ public class PerfTestCompactionPolicies {
|
|||
|
||||
static final Log LOG = LogFactory.getLog(PerfTestCompactionPolicies.class);
|
||||
|
||||
private final CompactionPolicy cp;
|
||||
private final DefaultCompactionPolicy cp;
|
||||
private final int max;
|
||||
private final int min;
|
||||
private final float ratio;
|
||||
|
@ -169,10 +169,11 @@ public class PerfTestCompactionPolicies {
|
|||
private List<StoreFile> runIteration(List<StoreFile> startingStoreFiles) throws IOException {
|
||||
|
||||
List<StoreFile> storeFiles = new ArrayList<StoreFile>(startingStoreFiles);
|
||||
CompactSelection sel = cp.selectCompaction(storeFiles, false, false, false);
|
||||
CompactionRequest req = cp.selectCompaction(
|
||||
storeFiles, new ArrayList<StoreFile>(), false, false, false);
|
||||
int newFileSize = 0;
|
||||
|
||||
List<StoreFile> filesToCompact = sel.getFilesToCompact();
|
||||
Collection<StoreFile> filesToCompact = req.getFiles();
|
||||
|
||||
if (!filesToCompact.isEmpty()) {
|
||||
|
||||
|
|
Loading…
Reference in New Issue